mirror of
https://github.com/PhotonVision/photonvision
synced 2026-07-05 03:21:40 +00:00
Merged Cameras handler into settings manager
This commit is contained in:
@@ -8,8 +8,9 @@ from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedExcepti
|
|||||||
|
|
||||||
|
|
||||||
class SettingsManager(metaclass=Singleton):
|
class SettingsManager(metaclass=Singleton):
|
||||||
cams = {} #our cameras settings
|
cams = {}
|
||||||
cams_info = {} #cscore USB camera objects
|
usb_cameras = {}
|
||||||
|
usb_cameras_info = {}
|
||||||
general_settings = {}
|
general_settings = {}
|
||||||
|
|
||||||
default_pipeline = {
|
default_pipeline = {
|
||||||
@@ -40,8 +41,9 @@ class SettingsManager(metaclass=Singleton):
|
|||||||
self.settings_path = os.path.join(os.getcwd(), "settings")
|
self.settings_path = os.path.join(os.getcwd(), "settings")
|
||||||
self.cams_path = os.path.join(self.settings_path, "cams")
|
self.cams_path = os.path.join(self.settings_path, "cams")
|
||||||
self._init_general_settings()
|
self._init_general_settings()
|
||||||
|
self._init_cameras_info()
|
||||||
|
self._init_usb_cameras()
|
||||||
self._init_cameras()
|
self._init_cameras()
|
||||||
self._init_camera()
|
|
||||||
|
|
||||||
if self.general_settings["curr_camera"] not in self.cams and len(self.cams) > 0:
|
if self.general_settings["curr_camera"] not in self.cams and len(self.cams) > 0:
|
||||||
cam_name = list(self.cams.keys())[0]
|
cam_name = list(self.cams.keys())[0]
|
||||||
@@ -55,6 +57,7 @@ class SettingsManager(metaclass=Singleton):
|
|||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
self.general_settings = self.default_general_settings.copy()
|
self.general_settings = self.default_general_settings.copy()
|
||||||
|
|
||||||
|
# Initiate our camera's settings
|
||||||
def _init_cameras(self):
|
def _init_cameras(self):
|
||||||
cameras = self._get_cameras_info()
|
cameras = self._get_cameras_info()
|
||||||
|
|
||||||
@@ -72,7 +75,7 @@ class SettingsManager(metaclass=Singleton):
|
|||||||
if "path" not in self.cams[cam.name]:
|
if "path" not in self.cams[cam.name]:
|
||||||
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
||||||
if "video_mode" not in self.cams[cam.name]:
|
if "video_mode" not in self.cams[cam.name]:
|
||||||
video_mode: VideoMode = self.cams[cam.name].enumerateVideoModes()[0]
|
video_mode: VideoMode = self.cams(cam.name).enumerateVideoModes()[0]
|
||||||
self.cams[cam.name]["video_mode"] = {
|
self.cams[cam.name]["video_mode"] = {
|
||||||
"fps": video_mode.fps,
|
"fps": video_mode.fps,
|
||||||
"width": video_mode.width,
|
"width": video_mode.width,
|
||||||
@@ -80,60 +83,59 @@ class SettingsManager(metaclass=Singleton):
|
|||||||
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
|
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
|
||||||
}
|
}
|
||||||
|
|
||||||
def _get_cameras_info(self):
|
# Initiate true usb cameras(filters microphones and double cameras)
|
||||||
arr = []
|
def _init_cameras_info(self):
|
||||||
|
true_cameras = []
|
||||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||||
|
|
||||||
for index in range(len(usb_devices)):
|
for index in range(len(usb_devices)):
|
||||||
cap = cv2.VideoCapture(index)
|
cap = cv2.VideoCapture(index)
|
||||||
if cap.isOpened():
|
if cap.isOpened():
|
||||||
arr.append(index)
|
true_cameras.append(index)
|
||||||
cap.release()
|
cap.release()
|
||||||
index += 1
|
index += 1
|
||||||
|
|
||||||
return [usb_devices[i] for i in arr]
|
for index in true_cameras:
|
||||||
|
self.usb_cameras_info[usb_devices[index].name] = usb_devices[index]
|
||||||
|
|
||||||
def _get_or_start_cameras(self):
|
# Initiate cscore usb devices
|
||||||
for device in self.cams_info:
|
def _init_usb_cameras(self):
|
||||||
|
for device in self.usb_cameras_info:
|
||||||
|
|
||||||
device_name = device.name
|
device_name = device.name
|
||||||
|
|
||||||
if device.name in self.cams_info:
|
if device_name in self.usb_cameras_info:
|
||||||
suffix = 0
|
suffix = 0
|
||||||
device_name = device.name + str(suffix)
|
device_name = device.name + str(suffix)
|
||||||
|
|
||||||
while device_name in self.cams:
|
while device_name in self.usb_cameras:
|
||||||
suffix +=1
|
suffix += 1
|
||||||
device_name = "pipeline" + str(suffix)
|
device_name = "pipeline" + str(suffix)
|
||||||
|
|
||||||
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
||||||
camera.setPixelFormat(pixelFormat=
|
camera.setPixelFormat(pixelFormat=
|
||||||
getattr(VideoMode.PixelFormat,
|
getattr(VideoMode.PixelFormat,
|
||||||
self.get_curr_cam()["video_mode"]["pixel_format"]))
|
self.get_curr_cam()["video_mode"]["pixel_format"]))
|
||||||
camera.setFPS(self.get_curr_cam()["video_mode"]["fps"])
|
camera.setFPS(self.get_curr_cam()["video_mode"]["fps"])
|
||||||
camera.setResolution(width=self.get_curr_cam()["video_mode"]["width"],
|
camera.setResolution(width=self.get_curr_cam()["video_mode"]["width"],
|
||||||
height=self.get_curr_cam()["video_mode"]["height"])
|
height=self.get_curr_cam()["video_mode"]["height"])
|
||||||
self.cams[device_name] = camera
|
|
||||||
|
|
||||||
def _init_camera(self):
|
self.usb_cameras[device_name] = camera
|
||||||
return self._get_or_start_cameras()
|
|
||||||
|
|
||||||
def _get_usb_camera_by_name(self):
|
# Change usb camera settings
|
||||||
pass
|
def set_camera_settings(self, camera_name, dic):
|
||||||
|
|
||||||
def set_camera_settings(self,usb_camera:cscore.UsbCamera, dic):
|
|
||||||
|
|
||||||
if "brightness" in dic:
|
if "brightness" in dic:
|
||||||
usb_camera.setBrightness(dic["brightness"])
|
self.usb_cameras[camera_name].setBrightness(dic["brightness"])
|
||||||
|
|
||||||
if "exposure" in dic:
|
if "exposure" in dic:
|
||||||
usb_camera.setExposureManual(dic["exposure"])
|
self.usb_cameras[camera_name].setExposureManual(dic["exposure"])
|
||||||
|
|
||||||
if "video_mode" in dic:
|
if "video_mode" in dic:
|
||||||
usb_camera.setVideoMode(dic["video_mode"])
|
self.usb_cameras[camera_name].setVideoMode(dic["video_mode"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Access methods
|
# Access methods
|
||||||
|
|
||||||
def get_curr_pipeline(self):
|
def get_curr_pipeline(self):
|
||||||
if self.general_settings["curr_pipeline"]:
|
if self.general_settings["curr_pipeline"]:
|
||||||
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
||||||
|
|||||||
@@ -24,35 +24,35 @@ class CamerasHandler:
|
|||||||
#
|
#
|
||||||
# return getattr(CamerasHandler, "cams_info")
|
# return getattr(CamerasHandler, "cams_info")
|
||||||
|
|
||||||
@staticmethod
|
# @staticmethod
|
||||||
def get_or_start_cameras(usb_devices):
|
# def get_or_start_cameras(usb_devices):
|
||||||
|
#
|
||||||
if not getattr(CamerasHandler, "cams", False):
|
# if not getattr(CamerasHandler, "cams", False):
|
||||||
cameras = {}
|
# cameras = {}
|
||||||
for device in usb_devices:
|
# for device in usb_devices:
|
||||||
device_name = device.name
|
# device_name = device.name
|
||||||
|
#
|
||||||
if device.name in cameras:
|
# if device.name in cameras:
|
||||||
suffix = 0
|
# suffix = 0
|
||||||
device_name = device.name + str(suffix)
|
# device_name = device.name + str(suffix)
|
||||||
|
#
|
||||||
while device_name in cameras:
|
# while device_name in cameras:
|
||||||
suffix += 1
|
# suffix += 1
|
||||||
device_name = "pipeline" + str(suffix)
|
# device_name = "pipeline" + str(suffix)
|
||||||
|
#
|
||||||
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
# camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
||||||
camera.setPixelFormat(pixelFormat=
|
# camera.setPixelFormat(pixelFormat=
|
||||||
getattr(VideoMode.PixelFormat, SettingsManager()
|
# getattr(VideoMode.PixelFormat, SettingsManager()
|
||||||
.get_curr_cam()["video_mode"]["pixel_format"]))
|
# .get_curr_cam()["video_mode"]["pixel_format"]))
|
||||||
camera.setFPS(SettingsManager().get_curr_cam()["video_mode"]["fps"])
|
# camera.setFPS(SettingsManager().get_curr_cam()["video_mode"]["fps"])
|
||||||
camera.setResolution(width=SettingsManager().get_curr_cam()["video_mode"]["width"],
|
# camera.setResolution(width=SettingsManager().get_curr_cam()["video_mode"]["width"],
|
||||||
height=SettingsManager().get_curr_cam()["video_mode"]["height"])
|
# height=SettingsManager().get_curr_cam()["video_mode"]["height"])
|
||||||
|
#
|
||||||
cameras[device_name] = camera
|
# cameras[device_name] = camera
|
||||||
|
#
|
||||||
setattr(CamerasHandler, "cams", cameras)
|
# setattr(CamerasHandler, "cams", cameras)
|
||||||
|
#
|
||||||
return getattr(CamerasHandler, "cams")
|
# return getattr(CamerasHandler, "cams")
|
||||||
|
|
||||||
# @staticmethod
|
# @staticmethod
|
||||||
# def init_camera():
|
# def init_camera():
|
||||||
|
|||||||
@@ -83,6 +83,5 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
|||||||
|
|
||||||
for key in self.set_this_camera_settings:
|
for key in self.set_this_camera_settings:
|
||||||
if key in dic:
|
if key in dic:
|
||||||
CamerasHandler.set_camera_settings(
|
self.settings_manager.set_camera_settings(self.settings_manager.general_settings["curr_camera"],
|
||||||
CamerasHandler.get_usb_camera_by_name(self.settings_manager.general_settings["curr_camera"]),
|
dic[key])
|
||||||
dic[key])
|
|
||||||
|
|||||||
Reference in New Issue
Block a user