Files
PhotonVision/backend/app/classes/SettingsManager.py
2019-08-20 23:39:03 +03:00

264 lines
9.6 KiB
Python

import os
import json
import cv2
import cscore
import subprocess
from cscore._cscore import VideoMode
from .Singleton import Singleton
from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedException
from ..handlers.IPHandler import ChangeIP
class SettingsManager(metaclass=Singleton):
cams = {}
usb_cameras = {}
usb_cameras_info = {}
general_settings = {}
cams_port = {}
cams_curr_pipeline = {}
default_pipeline = {
"exposure": 50,
"brightness": 50,
"orientation": "Normal",
"hue": [0, 100],
"saturation": [0, 100],
"value": [0, 100],
"erode": False,
"dilate": False,
"area": [0, 100],
"ratio": [0, 20],
"extent": [0, 100],
"is_binary": 0,
"sort_mode": "Largest",
"target_group": 'Single',
"target_intersection": 'Up',
"M": 1,
"B": 0
}
default_general_settings = {
"team_number": 1577,
"connection_type": "DHCP",
"ip": "",
"gateway": "",
"netmask": "",
"hostname": "Chameleon-Vision",
"curr_camera": "",
"curr_pipeline": ""
}
def __init__(self):
self.settings_path = os.path.join(os.getcwd(), "settings")
self.cams_path = os.path.join(self.settings_path, "cams")
self._init_general_settings()
ChangeIP(connection_type=self.general_settings['connection_type'], hostname=self.general_settings['hostname'],
ip=self.general_settings['ip'],
netmask=self.general_settings['netmask'], gateway=self.general_settings['gateway'])
self._init_cameras_info()
self._init_usb_cameras()
self._init_cameras()
self._init_usb_cameras_settings()
if self.general_settings["curr_camera"] not in self.cams:
if len(self.cams) > 0:
cam_name = list(self.cams.keys())[0]
self.general_settings["curr_camera"] = cam_name
self.general_settings["curr_pipeline"] = list(self.cams[cam_name]["pipelines"].keys())[0]
else:
self.general_settings["curr_camera"] = ""
self.general_settings["curr_pipeline"] = ""
def _init_general_settings(self):
try:
with open(os.path.join(self.settings_path, 'settings.json')) as setting_file:
self.general_settings = json.load(setting_file)
except FileNotFoundError:
self.general_settings = self.default_general_settings.copy()
# Initiate our camera's settings
def _init_cameras(self):
for cam_name in self.usb_cameras_info:
if os.path.exists(os.path.join(self.cams_path, cam_name + '.json')):
with open(os.path.join(self.cams_path, cam_name + '.json'), 'r') as camera:
self.cams[cam_name] = json.load(camera)
if len(self.cams[cam_name]["pipelines"]) == 0:
self.create_new_pipeline(cam_name=cam_name)
else:
self.create_new_cam(cam_name)
# Initiate true usb cameras(filters microphones and double cameras)
def _init_cameras_info(self):
true_cameras = []
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
for index, device in enumerate(usb_devices):
cap = cv2.VideoCapture(device.dev)
if cap.isOpened():
true_cameras.append(index)
cap.release()
for i in true_cameras:
device_name = usb_devices[i].name
suffix = 0
while device_name in self.usb_cameras_info:
suffix += 1
device_name = f"{device.name}({str(suffix)})"
self.usb_cameras_info[device_name] = usb_devices[i]
# Initiate cscore usb devices
def _init_usb_cameras(self):
for device_name in self.usb_cameras_info:
device = self.usb_cameras_info[device_name]
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
self.usb_cameras[device_name] = camera
def _init_usb_cameras_settings(self):
for cam_name in self.usb_cameras:
self.usb_cameras[cam_name].setPixelFormat(pixelFormat=getattr(VideoMode.PixelFormat, self.cams[cam_name]["video_mode"]["pixel_format"]))
self.usb_cameras[cam_name].setFPS(self.cams[cam_name]["video_mode"]["fps"])
self.usb_cameras[cam_name].setResolution(width=self.cams[cam_name]["video_mode"]["width"], height=self.cams[cam_name]["video_mode"]["height"])
# Change usb camera settings
def set_camera_settings(self, camera_name, dic):
if "brightness" in dic:
self.usb_cameras[camera_name].setBrightness(dic["brightness"])
if "exposure" in dic:
self.usb_cameras[camera_name].setExposureManual(dic["exposure"])
if "resolution" in dic:
video_mode: VideoMode = self.usb_cameras[camera_name].enumerateVideoModes()[int(dic["resolution"])]
self.cams[camera_name]["video_mode"] = {
"fps": video_mode.fps,
"width": video_mode.width,
"height": video_mode.height,
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
}
self.usb_cameras[camera_name].setVideoMode(self.usb_cameras[camera_name].enumerateVideoModes()[int(dic["resolution"])])
if "FOV" in dic:
self.cams[camera_name]["FOV"] = float(dic["FOV"])
# Access methods
def get_curr_pipeline(self):
if self.general_settings["curr_pipeline"]:
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
raise NoCameraConnectedException()
def get_resolution_list(self):
if self.general_settings["curr_camera"]:
str_list = []
for val in self.usb_cameras[self.general_settings["curr_camera"]].enumerateVideoModes():
str_list.append("{width} X {height} at {fps} fps".format(width=str(val.width),
height=str(val.height), fps=str(val.fps)))
return str_list
raise NoCameraConnectedException()
def get_curr_cam(self):
if self.general_settings["curr_camera"]:
return self.cams[self.general_settings["curr_camera"]]
raise NoCameraConnectedException()
def set_curr_camera(self, cam_name):
if cam_name in self.cams:
self.general_settings["curr_camera"] = cam_name
self.general_settings["curr_pipeline"] = list(self.get_curr_cam()["pipelines"].keys())[0]
def set_curr_pipeline(self, pipe_name):
if pipe_name in self.get_curr_cam()["pipelines"]:
self.general_settings["curr_pipeline"] = pipe_name
def change_pipeline_values(self, dic, cam_name=None, pipe_name=None):
if not cam_name:
cam_name = self.general_settings["curr_camera"]
if not pipe_name:
pipe_name = self.general_settings["curr_pipeline"]
for key in dic:
if key in self.default_pipeline:
self.cams[cam_name]["pipelines"][pipe_name][key] = dic[key]
def change_general_settings_values(self, dic):
for key in dic['change_general_settings_values']:
if key in self.default_general_settings.keys():
self.general_settings[key] = dic['change_general_settings_values'][key]
self.save_settings()
subprocess.call(['reboot'])
# after all values has been set change settings
# Creators
def create_new_pipeline(self, pipe_name=None, cam_name=None):
if not cam_name:
cam_name = self.general_settings["curr_camera"]
if not pipe_name:
suffix = 0
pipe_name = "pipeline" + str(suffix)
while pipe_name in self.cams[cam_name]["pipelines"]:
suffix += 1
pipe_name = "pipeline" + str(suffix)
elif self.cams[cam_name]["pipelines"][pipe_name]:
raise PipelineAlreadyExistsException(pipe_name)
self.cams[cam_name]["pipelines"][pipe_name] = self.default_pipeline.copy()
def create_new_cam(self, cam_name):
self.cams[cam_name] = {}
self.cams[cam_name]["pipelines"] = {}
for i in range(10):
self.create_new_pipeline(cam_name=cam_name)
self.cams[cam_name]["path"] = self.usb_cameras_info[cam_name].otherPaths[0] if len(
self.usb_cameras_info[cam_name].otherPaths) == 1 else self.usb_cameras_info[cam_name].otherPaths[1]
video_mode: VideoMode = self.usb_cameras[cam_name].enumerateVideoModes()[0]
self.cams[cam_name]["video_mode"] = {
"fps": video_mode.fps,
"width": video_mode.width,
"height": video_mode.height,
"pixel_format": str(video_mode.pixelFormat).split('.')[1],
}
self.cams[cam_name]['resolution'] = 0
self.cams[cam_name]["FOV"] = 60.8
# Savers
def save_settings(self):
self._save_general_settings()
self._save_cameras()
def _save_cameras(self):
if not os.path.exists(self.cams_path):
os.mkdir(self.cams_path)
for cam in self.cams:
with open(os.path.join(self.cams_path, cam + '.json'), 'w+') as camera:
json.dump(self.cams[cam], camera)
def _save_general_settings(self):
if not os.path.exists(self.settings_path):
os.mkdir(self.settings_path)
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
json.dump(self.general_settings, setting_file)