mirror of
https://github.com/PhotonVision/photonvision
synced 2026-07-01 02:41:42 +00:00
integration of camera handler into settings manager
This commit is contained in:
@@ -7,11 +7,12 @@ from app.handlers.CamerasHandler import CamerasHandler
|
||||
from app.handlers.VisionHandler import VisionHandler
|
||||
|
||||
if __name__ == "__main__":
|
||||
# SettingsManager()
|
||||
CamerasHandler.init_camera()
|
||||
SettingsManager()
|
||||
# CamerasHandler.init_camera()
|
||||
|
||||
|
||||
#VisionHandler().run()
|
||||
SettingsManager().save_settings()
|
||||
# SettingsManager().save_settings()
|
||||
|
||||
tornado.options.parse_command_line()
|
||||
app = ChameleonApplication()
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
import cv2
|
||||
import cscore
|
||||
from cscore._cscore import VideoMode
|
||||
|
||||
from .Singleton import Singleton
|
||||
from ..handlers.CamerasHandler import CamerasHandler
|
||||
from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedException
|
||||
|
||||
|
||||
class SettingsManager(metaclass=Singleton):
|
||||
cams = {}
|
||||
cams_info = {}
|
||||
general_settings = {}
|
||||
|
||||
default_pipeline = {
|
||||
@@ -37,10 +37,13 @@ class SettingsManager(metaclass=Singleton):
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
self.settings_path = os.path.join(os.getcwd(), "settings")
|
||||
self.cams_path = os.path.join(self.settings_path, "cams")
|
||||
self._init_general_settings()
|
||||
self._init_cameras()
|
||||
self._init_camera()
|
||||
|
||||
if self.general_settings["curr_camera"] not in self.cams and len(self.cams) > 0:
|
||||
cam_name = list(self.cams.keys())[0]
|
||||
@@ -55,7 +58,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
self.general_settings = self.default_general_settings.copy()
|
||||
|
||||
def _init_cameras(self):
|
||||
cameras = CamerasHandler.get_cameras_info()
|
||||
cameras = self._get_cameras_info()
|
||||
|
||||
for cam in cameras:
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name + '.json')):
|
||||
@@ -71,7 +74,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
if "path" not in self.cams[cam.name]:
|
||||
self.cams[cam.name]["path"] = cam.otherPaths[0] if len(cam.otherPaths) == 1 else cam.otherPaths[1]
|
||||
if "video_mode" not in self.cams[cam.name]:
|
||||
video_mode: VideoMode = CamerasHandler.get_usb_camera_by_name(cam.name).enumerateVideoModes()[0]
|
||||
video_mode: VideoMode = self.cams(cam.name).enumerateVideoModes()[0]
|
||||
self.cams[cam.name]["video_mode"] = {
|
||||
"fps": video_mode.fps,
|
||||
"width": video_mode.width,
|
||||
@@ -79,6 +82,59 @@ class SettingsManager(metaclass=Singleton):
|
||||
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
|
||||
}
|
||||
|
||||
def _get_cameras_info(self):
|
||||
arr = []
|
||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
for index in range(len(usb_devices)):
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
arr.append(index)
|
||||
cap.release()
|
||||
index += 1
|
||||
|
||||
return [usb_devices[i] for i in arr]
|
||||
|
||||
def _get_or_start_cameras(self):
|
||||
for device in self.cams_info:
|
||||
|
||||
device_name = device.name
|
||||
|
||||
if device.name in self.cams_info:
|
||||
suffix = 0
|
||||
device_name = device.name + str(suffix)
|
||||
|
||||
while device_name in self.cams:
|
||||
suffix +=1
|
||||
device_name = "pipeline" + str(suffix)
|
||||
|
||||
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
||||
camera.setPixelFormat(pixelFormat=
|
||||
getattr(VideoMode.PixelFormat,
|
||||
self.get_curr_cam()["video_mode"]["pixel_format"]))
|
||||
camera.setFPS(self.get_curr_cam()["video_mode"]["fps"])
|
||||
camera.setResolution(width=self.get_curr_cam()["video_mode"]["width"],
|
||||
height=self.get_curr_cam()["video_mode"]["height"])
|
||||
self.cams[device_name] = camera
|
||||
|
||||
def _init_camera(self):
|
||||
return self._get_or_start_cameras()
|
||||
|
||||
def _get_usb_camera_by_name(self):
|
||||
pass
|
||||
|
||||
def set_camera_settings(self,usb_camera:cscore.UsbCamera, dic):
|
||||
|
||||
if "brightness" in dic:
|
||||
usb_camera.setBrightness(dic["brightness"])
|
||||
|
||||
if "exposure" in dic:
|
||||
usb_camera.setExposureManual(dic["exposure"])
|
||||
|
||||
if "video_mode" in dic:
|
||||
usb_camera.setVideoMode(dic["video_mode"])
|
||||
|
||||
|
||||
|
||||
# Access methods
|
||||
|
||||
def get_curr_pipeline(self):
|
||||
|
||||
@@ -2,30 +2,27 @@ import cscore
|
||||
import cv2
|
||||
from cscore._cscore import VideoMode
|
||||
|
||||
from ..classes.SettingsManager import SettingsManager
|
||||
|
||||
|
||||
class CamerasHandler:
|
||||
|
||||
@staticmethod
|
||||
def get_cameras_info():
|
||||
|
||||
if not getattr(CamerasHandler, "cams_info", False):
|
||||
|
||||
arr = []
|
||||
|
||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
|
||||
for index in range(len(usb_devices)):
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
arr.append(index)
|
||||
cap.release()
|
||||
index += 1
|
||||
|
||||
setattr(CamerasHandler, "cams_info", [usb_devices[i] for i in arr])
|
||||
|
||||
return getattr(CamerasHandler, "cams_info")
|
||||
# def get_cameras_info():
|
||||
#
|
||||
# if not getattr(CamerasHandler, "cams_info", False):
|
||||
#
|
||||
# arr = []
|
||||
#
|
||||
# usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
#
|
||||
# for index in range(len(usb_devices)):
|
||||
# cap = cv2.VideoCapture(index)
|
||||
# if cap.isOpened():
|
||||
# arr.append(index)
|
||||
# cap.release()
|
||||
# index += 1
|
||||
#
|
||||
# setattr(CamerasHandler, "cams_info", [usb_devices[i] for i in arr])
|
||||
#
|
||||
# return getattr(CamerasHandler, "cams_info")
|
||||
|
||||
@staticmethod
|
||||
def get_or_start_cameras(usb_devices):
|
||||
@@ -57,13 +54,13 @@ class CamerasHandler:
|
||||
|
||||
return getattr(CamerasHandler, "cams")
|
||||
|
||||
@staticmethod
|
||||
def init_camera():
|
||||
return CamerasHandler.get_or_start_cameras(CamerasHandler.get_cameras_info())
|
||||
|
||||
@staticmethod
|
||||
def get_usb_camera_by_name(cam_name):
|
||||
return CamerasHandler.get_or_start_cameras(CamerasHandler.get_cameras_info())[cam_name]
|
||||
# @staticmethod
|
||||
# def init_camera():
|
||||
# return CamerasHandler.get_or_start_cameras(CamerasHandler.get_cameras_info())
|
||||
#
|
||||
# @staticmethod
|
||||
# def get_usb_camera_by_name(cam_name):
|
||||
# return CamerasHandler.get_or_start_cameras(CamerasHandler.get_cameras_info())[cam_name]
|
||||
|
||||
@staticmethod
|
||||
def set_camera_settings(usb_camera: cscore.UsbCamera, dic):
|
||||
|
||||
@@ -63,7 +63,7 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
full_settings = self.settings_manager.general_settings.copy()
|
||||
|
||||
try:
|
||||
full_settings["data"] = self.settings_manager.get_curr_pipeline()
|
||||
full_settings = self.settings_manager.get_curr_pipeline()
|
||||
except NoCameraConnectedException:
|
||||
# TODO: return something if no camera connected
|
||||
full_settings["data"] = None
|
||||
|
||||
@@ -4,7 +4,7 @@ import networktables
|
||||
import cv2
|
||||
import numpy
|
||||
from cscore import CameraServer
|
||||
from CamerasHandler import CamerasHandler
|
||||
from .CamerasHandler import CamerasHandler
|
||||
from app.classes.SettingsManager import SettingsManager
|
||||
import time
|
||||
import json
|
||||
|
||||
Reference in New Issue
Block a user