mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-26 01:51:40 +00:00
finished settings manager and socket handler
This commit is contained in:
@@ -3,7 +3,7 @@ import tornado.ioloop
|
||||
from app.ChameleonVisionApp import ChameleonApplication
|
||||
from app.classes.SettingsManager import SettingsManager
|
||||
from tornado.options import options
|
||||
from classes.CamerasHandler import CamerasHandler
|
||||
from handlers.CamerasHandler import CamerasHandler
|
||||
|
||||
if __name__ == "__main__":
|
||||
mng = SettingsManager()
|
||||
|
||||
@@ -1,15 +1,23 @@
|
||||
import os
|
||||
import json
|
||||
import cscore
|
||||
import networktables
|
||||
from .Singleton import Singleton
|
||||
from .CamerasHandler import CamerasHandler
|
||||
from handlers.CamerasHandler import CamerasHandler
|
||||
from .Exceptions import PipelineAlreadyExistsException
|
||||
|
||||
|
||||
class SettingsManager(metaclass=Singleton):
|
||||
cams = {}
|
||||
settings = {}
|
||||
general_settings = {}
|
||||
|
||||
default_pipeline = {
|
||||
"exposure": 50,
|
||||
"brightness": 50
|
||||
}
|
||||
default_general_settings = {
|
||||
"team_number": 1577,
|
||||
"curr_camera": "cam1",
|
||||
"curr_pipeline": "pipeline1"
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.settings_path = os.path.join(os.getcwd(), "settings")
|
||||
@@ -17,20 +25,19 @@ class SettingsManager(metaclass=Singleton):
|
||||
self._init_general_settings()
|
||||
self._init_cameras()
|
||||
|
||||
if not self.cams[self.general_settings["curr_camera"]]:
|
||||
self.general_settings["curr_camera"] = list(self.cams.keys())[0]
|
||||
|
||||
def _init_general_settings(self):
|
||||
try:
|
||||
with open(os.path.join(self.settings_path, 'settings.json')) as setting_file:
|
||||
self.settings = json.load(setting_file)
|
||||
self.general_settings = json.load(setting_file)
|
||||
except FileNotFoundError:
|
||||
self.settings = {
|
||||
"team_number": 1577,
|
||||
"curr_camera": "cam1",
|
||||
"curr_pipeline": "pipeline1"
|
||||
}
|
||||
self.general_settings = self.default_general_settings.copy()
|
||||
|
||||
def _init_cameras(self):
|
||||
cameras = CamerasHandler.get_cameras()
|
||||
|
||||
|
||||
for cam in cameras:
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name)):
|
||||
self.cams[cam.name] = {}
|
||||
@@ -50,25 +57,42 @@ class SettingsManager(metaclass=Singleton):
|
||||
# Access methods
|
||||
|
||||
def get_curr_pipeline(self):
|
||||
return self.cams[self.settings["curr_camera"]]["pipelines"][self.settings["curr_pipeline"]]
|
||||
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
||||
|
||||
def get_curr_cam(self):
|
||||
return self.cams[self.settings["curr_camera"]]
|
||||
return self.cams[self.general_settings["curr_camera"]]
|
||||
|
||||
def set_curr_camera(self, cam_name):
|
||||
if cam_name in self.cams:
|
||||
self.settings["curr_camera"] = cam_name
|
||||
self.general_settings["curr_camera"] = cam_name
|
||||
|
||||
def set_curr_pipeline(self, pipe_name):
|
||||
if pipe_name in self.cams:
|
||||
self.settings["curr_pipeline"] = pipe_name
|
||||
if pipe_name in self.get_curr_cam()["pipelines"]:
|
||||
self.general_settings["curr_pipeline"] = pipe_name
|
||||
|
||||
def change_pipeline_values(self, dic, cam_name=None, pipe_name=None):
|
||||
|
||||
if not cam_name:
|
||||
cam_name = self.general_settings["curr_camera"]
|
||||
|
||||
if not pipe_name:
|
||||
pipe_name = self.general_settings["curr_pipeline"]
|
||||
|
||||
for key in dic:
|
||||
if self.default_pipeline[key]:
|
||||
self.cams[cam_name]["pipelines"][pipe_name][key] = dic[key]
|
||||
|
||||
def change_general_settings_values(self, dic):
|
||||
for key in dic:
|
||||
if self.default_general_settings[key]:
|
||||
self.general_settings[key] = dic[key]
|
||||
|
||||
# Creators
|
||||
|
||||
def create_new_pipeline(self, pipe_name=None, cam_name=None):
|
||||
|
||||
if not cam_name:
|
||||
cam_name = self.settings["curr_camera"]
|
||||
cam_name = self.general_settings["curr_camera"]
|
||||
|
||||
if not pipe_name:
|
||||
suffix = 0
|
||||
@@ -80,10 +104,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
elif self.cams[cam_name]["pipelines"][pipe_name]:
|
||||
raise PipelineAlreadyExistsException(pipe_name)
|
||||
|
||||
self.cams[cam_name]["pipelines"][pipe_name] = {
|
||||
"exposure": 50,
|
||||
"brightness": 50
|
||||
}
|
||||
self.cams[cam_name]["pipelines"][pipe_name] = self.default_pipeline.copy()
|
||||
|
||||
def create_new_cam(self, cam_name):
|
||||
self.cams[cam_name] = {}
|
||||
@@ -92,7 +113,11 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
# Savers
|
||||
|
||||
def save_cameras(self):
|
||||
def save_settings(self):
|
||||
self._save_cameras()
|
||||
self._save_general_settings()
|
||||
|
||||
def _save_cameras(self):
|
||||
for cam in self.cams:
|
||||
for pipeline in self.cams[cam]:
|
||||
path = os.path.join(self.cams_path, cam)
|
||||
@@ -103,6 +128,6 @@ class SettingsManager(metaclass=Singleton):
|
||||
with open(os.path.join(path, pipeline + '.json'), 'w+') as pipeline_file:
|
||||
json.dump(self.cams[cam][pipeline], pipeline_file)
|
||||
|
||||
def save_settings(self):
|
||||
def _save_general_settings(self):
|
||||
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
|
||||
json.dump(self.settings, setting_file)
|
||||
json.dump(self.general_settings, setting_file)
|
||||
|
||||
@@ -7,50 +7,21 @@ from ..classes.SettingsManager import SettingsManager
|
||||
class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
|
||||
actions = {}
|
||||
settings = {}
|
||||
cams = {}
|
||||
|
||||
def __init__(self, application, request, **kwargs):
|
||||
super().__init__(application, request, **kwargs)
|
||||
self.smng = SettingsManager()
|
||||
self.settings_path = os.path.join(os.getcwd(), "settings")
|
||||
self.cams_path = os.path.join(self.settings_path, "cams")
|
||||
self.init_settings()
|
||||
self.settings_manager = SettingsManager()
|
||||
self.init_actions()
|
||||
|
||||
# Init methods
|
||||
|
||||
def init_settings(self):
|
||||
with open(os.path.join(self.settings_path, 'settings.json')) as setting_file:
|
||||
self.settings = json.load(setting_file)
|
||||
|
||||
self.init_cameras_settings()
|
||||
|
||||
def init_cameras_settings(self):
|
||||
if os.path.exists(self.cams_path):
|
||||
for curr_dir in os.listdir(self.cams_path):
|
||||
pipelines = {}
|
||||
for file in os.listdir(os.path.join(self.cams_path, curr_dir)):
|
||||
with open(os.path.join(self.cams_path, curr_dir, file)) as pipeline:
|
||||
pipelines[os.path.splitext(file)[0]] = json.load(pipeline)
|
||||
|
||||
self.cams[curr_dir] = pipelines
|
||||
else:
|
||||
self.create_new_cam("cam1")
|
||||
self.settings["curr_camera"] = "cam1"
|
||||
self.settings["curr_pipeline"] = "pipeline1"
|
||||
|
||||
def init_actions(self):
|
||||
self.actions["camera"] = self.change_curr_camera
|
||||
self.actions["pipeline"] = self.change_curr_pipeline
|
||||
self.actions["change_pipeline_values"] = self.change_pipeline_values
|
||||
|
||||
# Socket methods
|
||||
self.actions["change_pipeline_values"] = self.settings_manager.change_pipeline_values
|
||||
self.actions["change_general_settings_values"] = self.settings_manager.change_general_settings_values
|
||||
self.actions["change_cam"] = self.change_curr_camera
|
||||
self.actions["change_pipeline"] = self.change_curr_pipeline
|
||||
|
||||
def open(self):
|
||||
|
||||
# TODO: send full settings(team number....)
|
||||
self.send_curr_cam()
|
||||
self.send_full_settings()
|
||||
|
||||
print("WebSocket opened")
|
||||
|
||||
@@ -59,85 +30,31 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
message_dic = json.loads(message)
|
||||
|
||||
for key in message_dic:
|
||||
self.actions.get(key, self.actions["change_pipeline_values"])(message_dic)
|
||||
self.actions.get(key, self.actions["change_pipeline_values"])(message_dic[key])
|
||||
|
||||
print(message)
|
||||
|
||||
def on_close(self):
|
||||
self.save_settings()
|
||||
self.save_cameras()
|
||||
self.settings_manager.save_settings()
|
||||
|
||||
print("WebSocket closed")
|
||||
|
||||
def check_origin(self, origin):
|
||||
return True
|
||||
|
||||
def send_curr_cam(self):
|
||||
self.write_message(self.get_curr_pipeline())
|
||||
self.write_message(self.settings_manager.get_curr_pipeline())
|
||||
|
||||
# Access methods
|
||||
def send_full_settings(self):
|
||||
full_settings = self.settings_manager.general_settings.copy()
|
||||
full_settings["data"] = self.settings_manager.get_curr_pipeline()
|
||||
|
||||
def get_curr_pipeline(self):
|
||||
return self.cams[self.settings["curr_camera"]][self.settings["curr_pipeline"]]
|
||||
|
||||
def get_curr_cam(self):
|
||||
return self.cams[self.settings["curr_camera"]]
|
||||
|
||||
def set_curr_camera(self, cam_name):
|
||||
if cam_name in self.cams:
|
||||
self.settings["curr_camera"] = cam_name
|
||||
|
||||
def set_curr_pipeline(self, pipe_name):
|
||||
if pipe_name in self.cams:
|
||||
self.settings["curr_pipeline"] = pipe_name
|
||||
|
||||
# Actions
|
||||
|
||||
def change_pipeline_values(self, dic):
|
||||
for key in dic:
|
||||
self.get_curr_pipeline()[key] = dic[key]
|
||||
self.write_message(full_settings)
|
||||
|
||||
def change_curr_camera(self, dic):
|
||||
cam_name = 'cam' + dic["camera"]
|
||||
|
||||
if cam_name not in self.cams:
|
||||
self.create_new_cam(cam_name)
|
||||
|
||||
self.set_curr_camera(cam_name)
|
||||
self.send_curr_cam()
|
||||
self.settings_manager.set_curr_camera(cam_name=dic["cam"])
|
||||
|
||||
def change_curr_pipeline(self, dic):
|
||||
pipe_name = 'pipeline' + dic["pipeline"]
|
||||
self.settings_manager.set_curr_pipeline(pipe_name=dic["pipeline"])
|
||||
|
||||
if pipe_name not in self.get_curr_cam():
|
||||
self.create_new_pipeline(pipe_name)
|
||||
|
||||
self.set_curr_pipeline(pipe_name)
|
||||
|
||||
# Creators
|
||||
|
||||
def create_new_pipeline(self, pipe_name, cam=None):
|
||||
if not cam:
|
||||
cam = self.settings["curr_camera"]
|
||||
|
||||
with open(os.path.join(self.settings_path, 'default_pipeline.json')) as default_pipeline:
|
||||
self.cams[cam][pipe_name] = json.load(default_pipeline)
|
||||
|
||||
def create_new_cam(self, cam):
|
||||
self.cams[cam] = {}
|
||||
self.create_new_pipeline('pipeline1', cam)
|
||||
|
||||
# Save data functions
|
||||
|
||||
def save_cameras(self):
|
||||
for cam in self.cams:
|
||||
for pipeline in self.cams[cam]:
|
||||
path = os.path.join(self.cams_path, cam)
|
||||
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
|
||||
with open(os.path.join(path, pipeline + '.json'), 'w+') as pipeline_file:
|
||||
json.dump(self.cams[cam][pipeline], pipeline_file)
|
||||
|
||||
def save_settings(self):
|
||||
with open(os.path.join(self.settings_path, 'settings.json'), 'w') as setting_file:
|
||||
json.dump(self.settings, setting_file)
|
||||
|
||||
@@ -66,7 +66,7 @@ class VisionHandler:
|
||||
|
||||
def run(self):
|
||||
camera_server = cscore.CameraServer.getInstance()
|
||||
networktables.NetworkTables.startClientTeam(team=SettingsManager.settings["team_number"])
|
||||
networktables.NetworkTables.startClientTeam(team=SettingsManager.general_settings["team_number"])
|
||||
networktables.NetworkTables.initialize()
|
||||
|
||||
def camera_process(cv_sink, stream):
|
||||
|
||||
Reference in New Issue
Block a user