mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-24 01:31:44 +00:00
FIilter usb devices
This commit is contained in:
@@ -1,12 +1,11 @@
|
||||
import tornado.ioloop
|
||||
|
||||
from app.ChameleonVisionApp import ChameleonApplication
|
||||
from app.classes.Settings import SettingsManager
|
||||
from app.classes.SettingsManager import SettingsManager
|
||||
from tornado.options import options
|
||||
|
||||
if __name__ == "__main__":
|
||||
mng = SettingsManager()
|
||||
m = SettingsManager()
|
||||
tornado.options.parse_command_line()
|
||||
app = ChameleonApplication()
|
||||
print(f"Serving on port {options.port}")
|
||||
|
||||
19
backend/app/classes/CamerasHandler.py
Normal file
19
backend/app/classes/CamerasHandler.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import cscore
|
||||
import cv2
|
||||
|
||||
|
||||
class CamerasHandler:
|
||||
|
||||
@staticmethod
|
||||
def get_cameras():
|
||||
arr = []
|
||||
|
||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
|
||||
for index in range(len(usb_devices)):
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
arr.append(index)
|
||||
cap.release()
|
||||
index += 1
|
||||
return [usb_devices[i] for i in arr]
|
||||
@@ -1,10 +1,12 @@
|
||||
import os
|
||||
import json
|
||||
from Singleton import Singleton
|
||||
import cscore
|
||||
import networktables
|
||||
from .Singleton import Singleton
|
||||
from .CamerasHandler import CamerasHandler
|
||||
|
||||
|
||||
class SettingsManager(metaclass=Singleton):
|
||||
|
||||
cams = {}
|
||||
|
||||
def __init__(self):
|
||||
@@ -28,23 +30,28 @@ class SettingsManager(metaclass=Singleton):
|
||||
init_default_settings()
|
||||
|
||||
def _init_cameras(self):
|
||||
if os.path.exists(self.cams_path):
|
||||
for curr_dir in os.listdir(self.cams_path):
|
||||
pipelines = {}
|
||||
for file in os.listdir(os.path.join(self.cams_path, curr_dir)):
|
||||
with open(os.path.join(self.cams_path, curr_dir, file)) as pipeline:
|
||||
pipelines[os.path.splitext(file)[0]] = json.load(pipeline)
|
||||
cameras = CamerasHandler.get_cameras()
|
||||
|
||||
self.cams[curr_dir] = pipelines
|
||||
else:
|
||||
self.create_new_cam("cam1")
|
||||
self.settings["curr_camera"] = "cam1"
|
||||
self.settings["curr_pipeline"] = "pipeline1"
|
||||
for cam in cameras:
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name)):
|
||||
self.cams[cam.name] = {}
|
||||
self.cams[cam.name]["pipelines"] = {}
|
||||
|
||||
for file in os.listdir(os.path.join(self.cams_path, cam.name)):
|
||||
with open(os.path.join(self.cams_path, cam.name, file)) as pipeline:
|
||||
self.cams[cam.name]["pipelines"][os.path.splitext(file)[0]] = json.load(pipeline)
|
||||
|
||||
if len(self.cams[cam.name]["pipelines"]) == 0:
|
||||
self.create_new_pipeline(cam_name=cam.name)
|
||||
else:
|
||||
self.create_new_cam(cam.name)
|
||||
|
||||
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
||||
|
||||
# Access methods
|
||||
|
||||
def get_curr_pipeline(self):
|
||||
return self.cams[self.settings["curr_camera"]][self.settings["curr_pipeline"]]
|
||||
return self.cams[self.settings["curr_camera"]]["pipelines"][self.settings["curr_pipeline"]]
|
||||
|
||||
def get_curr_cam(self):
|
||||
return self.cams[self.settings["curr_camera"]]
|
||||
@@ -59,16 +66,23 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
# Creators
|
||||
|
||||
def create_new_pipeline(self, pipe_name, cam=None):
|
||||
if not cam:
|
||||
cam = self.settings["curr_camera"]
|
||||
def create_new_pipeline(self, pipe_name=None, cam_name=None):
|
||||
|
||||
with open(os.path.join(self.settings_path, 'default_pipeline.json')) as default_pipeline:
|
||||
self.cams[cam][pipe_name] = json.load(default_pipeline)
|
||||
if not cam_name:
|
||||
cam_name = self.settings["curr_camera"]
|
||||
|
||||
def create_new_cam(self, cam):
|
||||
self.cams[cam] = {}
|
||||
self.create_new_pipeline('pipeline1', cam)
|
||||
if not pipe_name:
|
||||
pipe_name = "pipeline" + str(len(self.cams[cam_name]))
|
||||
|
||||
self.cams[cam_name]["pipelines"][pipe_name] = {
|
||||
"exposure": 50,
|
||||
"brightness": 50
|
||||
}
|
||||
|
||||
def create_new_cam(self, cam_name):
|
||||
self.cams[cam_name] = {}
|
||||
self.cams[cam_name]["pipelines"] = {}
|
||||
self.create_new_pipeline(cam_name=cam_name)
|
||||
|
||||
# Savers
|
||||
|
||||
@@ -85,4 +99,4 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
def save_settings(self):
|
||||
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
|
||||
json.dump(self.settings, setting_file)
|
||||
json.dump(self.settings, setting_file)
|
||||
@@ -1,6 +1,7 @@
|
||||
import tornado.websocket
|
||||
import json
|
||||
import os
|
||||
from ..classes.SettingsManager import SettingsManager
|
||||
|
||||
|
||||
class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
@@ -11,6 +12,7 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
|
||||
def __init__(self, application, request, **kwargs):
|
||||
super().__init__(application, request, **kwargs)
|
||||
self.smng = SettingsManager()
|
||||
self.settings_path = os.path.join(os.getcwd(), "settings")
|
||||
self.cams_path = os.path.join(self.settings_path, "cams")
|
||||
self.init_settings()
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
{
|
||||
"exposure": 50,
|
||||
"brightness": 50
|
||||
}
|
||||
Reference in New Issue
Block a user