mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-21 01:01:41 +00:00
fixed settings manager where no camera is connected
didn't check yet
This commit is contained in:
@@ -3,3 +3,9 @@ class PipelineAlreadyExistsException(Exception):
|
||||
|
||||
def __init__(self, pipe_name):
|
||||
super(f"Pipeline {pipe_name} already exists")
|
||||
|
||||
|
||||
class NoCameraConnectedException(Exception):
|
||||
|
||||
def __init__(self):
|
||||
super("No camera as been detected")
|
||||
|
||||
@@ -3,7 +3,7 @@ import json
|
||||
from .Singleton import Singleton
|
||||
# from .handlers.CamerasHandler import CamerasHandler
|
||||
from ..handlers.CamerasHandler import CamerasHandler
|
||||
from .Exceptions import PipelineAlreadyExistsException
|
||||
from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedException
|
||||
|
||||
|
||||
class SettingsManager(metaclass=Singleton):
|
||||
@@ -42,6 +42,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
if self.general_settings["curr_camera"] not in self.cams and len(list(self.cams.keys())) > 0:
|
||||
self.general_settings["curr_camera"] = list(self.cams.keys())[0]
|
||||
self.general_settings["curr_pipeline"] = list(list(self.cams.keys())[0]["pipelines"].keys())[0]
|
||||
|
||||
def _init_general_settings(self):
|
||||
try:
|
||||
@@ -54,28 +55,32 @@ class SettingsManager(metaclass=Singleton):
|
||||
cameras = CamerasHandler.get_cameras_info()
|
||||
|
||||
for cam in cameras:
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name)):
|
||||
self.cams[cam.name] = {}
|
||||
self.cams[cam.name]["pipelines"] = {}
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name + '.json')):
|
||||
|
||||
for file in os.listdir(os.path.join(self.cams_path, cam.name)):
|
||||
with open(os.path.join(self.cams_path, cam.name, file)) as pipeline:
|
||||
self.cams[cam.name]["pipelines"][os.path.splitext(file)[0]] = json.load(pipeline)
|
||||
with open(os.path.join(self.cams_path, cam.name + '.json'), 'r') as camera:
|
||||
self.cams[cam.name] = json.load(camera)
|
||||
|
||||
if len(self.cams[cam.name]["pipelines"]) == 0:
|
||||
self.create_new_pipeline(cam_name=cam.name)
|
||||
else:
|
||||
self.create_new_cam(cam.name)
|
||||
|
||||
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
||||
if not self.cams[cam.name]["path"]:
|
||||
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
||||
|
||||
# Access methods
|
||||
|
||||
def get_curr_pipeline(self):
|
||||
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
||||
if self.general_settings["curr_pipeline"]:
|
||||
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
||||
|
||||
raise NoCameraConnectedException()
|
||||
|
||||
def get_curr_cam(self):
|
||||
return self.cams[self.general_settings["curr_camera"]]
|
||||
if self.general_settings["curr_camera"]:
|
||||
return self.cams[self.general_settings["curr_camera"]]
|
||||
|
||||
raise NoCameraConnectedException()
|
||||
|
||||
def set_curr_camera(self, cam_name):
|
||||
if cam_name in self.cams:
|
||||
@@ -134,14 +139,8 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
def _save_cameras(self):
|
||||
for cam in self.cams:
|
||||
for pipeline in self.cams[cam]:
|
||||
path = os.path.join(self.cams_path, cam)
|
||||
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
|
||||
with open(os.path.join(path, pipeline + '.json'), 'w+') as pipeline_file:
|
||||
json.dump(self.cams[cam][pipeline], pipeline_file)
|
||||
with open(os.path.join(self.cams_path, cam + '.json'), 'w+') as camera:
|
||||
json.dump(self.cams[cam], camera)
|
||||
|
||||
def _save_general_settings(self):
|
||||
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import tornado.websocket
|
||||
import json
|
||||
|
||||
from Exceptions import NoCameraConnectedException
|
||||
from .CamerasHandler import CamerasHandler
|
||||
from ..classes.SettingsManager import SettingsManager
|
||||
|
||||
@@ -43,11 +45,20 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
return True
|
||||
|
||||
def send_curr_cam(self):
|
||||
self.write_message(self.settings_manager.get_curr_pipeline())
|
||||
try:
|
||||
self.write_message(self.settings_manager.get_curr_pipeline())
|
||||
except NoCameraConnectedException:
|
||||
# TODO: return something if no camera connected
|
||||
self.write_message(None)
|
||||
|
||||
def send_full_settings(self):
|
||||
full_settings = self.settings_manager.general_settings.copy()
|
||||
full_settings["data"] = self.settings_manager.get_curr_pipeline()
|
||||
|
||||
try:
|
||||
full_settings["data"] = self.settings_manager.get_curr_pipeline()
|
||||
except NoCameraConnectedException:
|
||||
# TODO: return something if no camera connected
|
||||
full_settings["data"] = None
|
||||
|
||||
self.write_message(full_settings)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user