2019-04-09 00:37:26 +03:00
|
|
|
import os
|
2019-04-20 09:31:16 -07:00
|
|
|
import json
|
2019-05-02 12:42:44 -07:00
|
|
|
import cv2
|
|
|
|
|
import cscore
|
2019-05-01 10:16:40 -07:00
|
|
|
from cscore._cscore import VideoMode
|
2019-04-13 11:57:43 -07:00
|
|
|
from .Singleton import Singleton
|
2019-04-28 00:12:02 +03:00
|
|
|
from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedException
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
|
2019-04-10 19:23:40 +03:00
|
|
|
class SettingsManager(metaclass=Singleton):
|
2019-05-05 00:54:45 +03:00
|
|
|
cams = {}
|
|
|
|
|
usb_cameras = {}
|
|
|
|
|
usb_cameras_info = {}
|
2019-04-20 06:53:06 -07:00
|
|
|
general_settings = {}
|
|
|
|
|
|
|
|
|
|
default_pipeline = {
|
|
|
|
|
"exposure": 50,
|
2019-04-20 09:02:21 -07:00
|
|
|
"brightness": 50,
|
2019-04-26 21:56:40 +03:00
|
|
|
"orientation": "Normal",
|
|
|
|
|
"resolution": [320, 160],
|
2019-04-20 09:02:21 -07:00
|
|
|
"hue": [0, 100],
|
|
|
|
|
"saturation": [0, 100],
|
2019-04-26 21:56:40 +03:00
|
|
|
"value": [0, 100],
|
|
|
|
|
"erode": False,
|
|
|
|
|
"dilate": False,
|
|
|
|
|
"area": [0, 100],
|
|
|
|
|
"ratio": [0, 20],
|
|
|
|
|
"extent": [0, 100]
|
2019-04-20 06:53:06 -07:00
|
|
|
}
|
|
|
|
|
default_general_settings = {
|
|
|
|
|
"team_number": 1577,
|
2019-04-26 21:56:40 +03:00
|
|
|
"connection_type": "DHCP",
|
|
|
|
|
"ip": "",
|
|
|
|
|
"gateway": "",
|
|
|
|
|
"hostname": "Chameleon-Vision",
|
2019-04-20 09:02:21 -07:00
|
|
|
"curr_camera": "",
|
|
|
|
|
"curr_pipeline": ""
|
2019-04-20 06:53:06 -07:00
|
|
|
}
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.settings_path = os.path.join(os.getcwd(), "settings")
|
|
|
|
|
self.cams_path = os.path.join(self.settings_path, "cams")
|
|
|
|
|
self._init_general_settings()
|
2019-05-05 00:54:45 +03:00
|
|
|
self._init_cameras_info()
|
|
|
|
|
self._init_usb_cameras()
|
2019-04-09 00:37:26 +03:00
|
|
|
self._init_cameras()
|
|
|
|
|
|
2019-04-27 14:48:58 -07:00
|
|
|
if self.general_settings["curr_camera"] not in self.cams and len(self.cams) > 0:
|
|
|
|
|
cam_name = list(self.cams.keys())[0]
|
|
|
|
|
self.general_settings["curr_camera"] = cam_name
|
|
|
|
|
self.general_settings["curr_pipeline"] = list(self.cams[cam_name]["pipelines"].keys())[0]
|
2019-04-20 06:53:06 -07:00
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
def _init_general_settings(self):
|
2019-04-20 12:39:24 +03:00
|
|
|
try:
|
|
|
|
|
with open(os.path.join(self.settings_path, 'settings.json')) as setting_file:
|
2019-04-20 06:53:06 -07:00
|
|
|
self.general_settings = json.load(setting_file)
|
2019-04-20 12:39:24 +03:00
|
|
|
except FileNotFoundError:
|
2019-04-20 06:53:06 -07:00
|
|
|
self.general_settings = self.default_general_settings.copy()
|
2019-04-09 00:37:26 +03:00
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
# Initiate our camera's settings
|
2019-04-09 00:37:26 +03:00
|
|
|
def _init_cameras(self):
|
2019-05-05 01:07:38 +03:00
|
|
|
for cam in self.usb_cameras_info:
|
2019-04-28 00:12:02 +03:00
|
|
|
if os.path.exists(os.path.join(self.cams_path, cam.name + '.json')):
|
2019-04-13 11:57:43 -07:00
|
|
|
|
2019-04-28 00:12:02 +03:00
|
|
|
with open(os.path.join(self.cams_path, cam.name + '.json'), 'r') as camera:
|
|
|
|
|
self.cams[cam.name] = json.load(camera)
|
2019-04-13 11:57:43 -07:00
|
|
|
|
|
|
|
|
if len(self.cams[cam.name]["pipelines"]) == 0:
|
|
|
|
|
self.create_new_pipeline(cam_name=cam.name)
|
|
|
|
|
else:
|
|
|
|
|
self.create_new_cam(cam.name)
|
|
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
# Initiate true usb cameras(filters microphones and double cameras)
|
|
|
|
|
def _init_cameras_info(self):
|
|
|
|
|
true_cameras = []
|
2019-05-02 12:42:44 -07:00
|
|
|
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
2019-05-05 00:54:45 +03:00
|
|
|
|
2019-05-02 12:42:44 -07:00
|
|
|
for index in range(len(usb_devices)):
|
|
|
|
|
cap = cv2.VideoCapture(index)
|
|
|
|
|
if cap.isOpened():
|
2019-05-05 00:54:45 +03:00
|
|
|
true_cameras.append(index)
|
2019-05-02 12:42:44 -07:00
|
|
|
cap.release()
|
|
|
|
|
index += 1
|
|
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
for index in true_cameras:
|
|
|
|
|
self.usb_cameras_info[usb_devices[index].name] = usb_devices[index]
|
2019-05-02 12:42:44 -07:00
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
# Initiate cscore usb devices
|
|
|
|
|
def _init_usb_cameras(self):
|
|
|
|
|
for device in self.usb_cameras_info:
|
2019-05-02 12:42:44 -07:00
|
|
|
|
|
|
|
|
device_name = device.name
|
|
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
if device_name in self.usb_cameras_info:
|
2019-05-02 12:42:44 -07:00
|
|
|
suffix = 0
|
|
|
|
|
device_name = device.name + str(suffix)
|
|
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
while device_name in self.usb_cameras:
|
|
|
|
|
suffix += 1
|
2019-05-02 12:42:44 -07:00
|
|
|
device_name = "pipeline" + str(suffix)
|
|
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
|
|
|
|
camera.setPixelFormat(pixelFormat=
|
|
|
|
|
getattr(VideoMode.PixelFormat,
|
|
|
|
|
self.get_curr_cam()["video_mode"]["pixel_format"]))
|
|
|
|
|
camera.setFPS(self.get_curr_cam()["video_mode"]["fps"])
|
|
|
|
|
camera.setResolution(width=self.get_curr_cam()["video_mode"]["width"],
|
|
|
|
|
height=self.get_curr_cam()["video_mode"]["height"])
|
2019-05-02 12:42:44 -07:00
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
self.usb_cameras[device_name] = camera
|
2019-05-02 12:42:44 -07:00
|
|
|
|
2019-05-05 00:54:45 +03:00
|
|
|
# Change usb camera settings
|
|
|
|
|
def set_camera_settings(self, camera_name, dic):
|
2019-05-02 12:42:44 -07:00
|
|
|
|
|
|
|
|
if "brightness" in dic:
|
2019-05-05 00:54:45 +03:00
|
|
|
self.usb_cameras[camera_name].setBrightness(dic["brightness"])
|
2019-05-02 12:42:44 -07:00
|
|
|
|
|
|
|
|
if "exposure" in dic:
|
2019-05-05 00:54:45 +03:00
|
|
|
self.usb_cameras[camera_name].setExposureManual(dic["exposure"])
|
2019-05-02 12:42:44 -07:00
|
|
|
|
|
|
|
|
if "video_mode" in dic:
|
2019-05-05 00:54:45 +03:00
|
|
|
self.usb_cameras[camera_name].setVideoMode(dic["video_mode"])
|
2019-05-02 12:42:44 -07:00
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
# Access methods
|
2019-05-05 00:54:45 +03:00
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
def get_curr_pipeline(self):
|
2019-04-28 00:12:02 +03:00
|
|
|
if self.general_settings["curr_pipeline"]:
|
|
|
|
|
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
|
|
|
|
|
|
|
|
|
|
raise NoCameraConnectedException()
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
def get_curr_cam(self):
|
2019-04-28 00:12:02 +03:00
|
|
|
if self.general_settings["curr_camera"]:
|
|
|
|
|
return self.cams[self.general_settings["curr_camera"]]
|
|
|
|
|
|
|
|
|
|
raise NoCameraConnectedException()
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
def set_curr_camera(self, cam_name):
|
|
|
|
|
if cam_name in self.cams:
|
2019-04-20 06:53:06 -07:00
|
|
|
self.general_settings["curr_camera"] = cam_name
|
2019-05-01 10:16:40 -07:00
|
|
|
self.general_settings["curr_pipeline"] = self.get_curr_cam()["pipelines"].keys()[0]
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
def set_curr_pipeline(self, pipe_name):
|
2019-04-20 06:53:06 -07:00
|
|
|
if pipe_name in self.get_curr_cam()["pipelines"]:
|
|
|
|
|
self.general_settings["curr_pipeline"] = pipe_name
|
|
|
|
|
|
|
|
|
|
def change_pipeline_values(self, dic, cam_name=None, pipe_name=None):
|
|
|
|
|
|
|
|
|
|
if not cam_name:
|
|
|
|
|
cam_name = self.general_settings["curr_camera"]
|
|
|
|
|
|
|
|
|
|
if not pipe_name:
|
|
|
|
|
pipe_name = self.general_settings["curr_pipeline"]
|
|
|
|
|
|
|
|
|
|
for key in dic:
|
|
|
|
|
if self.default_pipeline[key]:
|
|
|
|
|
self.cams[cam_name]["pipelines"][pipe_name][key] = dic[key]
|
|
|
|
|
|
|
|
|
|
def change_general_settings_values(self, dic):
|
|
|
|
|
for key in dic:
|
|
|
|
|
if self.default_general_settings[key]:
|
|
|
|
|
self.general_settings[key] = dic[key]
|
2019-04-09 00:37:26 +03:00
|
|
|
|
|
|
|
|
# Creators
|
|
|
|
|
|
2019-04-13 11:57:43 -07:00
|
|
|
def create_new_pipeline(self, pipe_name=None, cam_name=None):
|
|
|
|
|
|
|
|
|
|
if not cam_name:
|
2019-04-20 06:53:06 -07:00
|
|
|
cam_name = self.general_settings["curr_camera"]
|
2019-04-13 11:57:43 -07:00
|
|
|
|
|
|
|
|
if not pipe_name:
|
2019-04-20 12:39:24 +03:00
|
|
|
suffix = 0
|
|
|
|
|
pipe_name = "pipeline" + str(suffix)
|
|
|
|
|
|
2019-04-20 09:02:21 -07:00
|
|
|
while pipe_name in self.cams[cam_name]["pipelines"]:
|
2019-04-20 12:39:24 +03:00
|
|
|
suffix += 1
|
|
|
|
|
pipe_name = "pipeline" + str(suffix)
|
|
|
|
|
elif self.cams[cam_name]["pipelines"][pipe_name]:
|
|
|
|
|
raise PipelineAlreadyExistsException(pipe_name)
|
2019-04-09 00:37:26 +03:00
|
|
|
|
2019-04-20 06:53:06 -07:00
|
|
|
self.cams[cam_name]["pipelines"][pipe_name] = self.default_pipeline.copy()
|
2019-04-09 00:37:26 +03:00
|
|
|
|
2019-04-13 11:57:43 -07:00
|
|
|
def create_new_cam(self, cam_name):
|
|
|
|
|
self.cams[cam_name] = {}
|
|
|
|
|
self.cams[cam_name]["pipelines"] = {}
|
|
|
|
|
self.create_new_pipeline(cam_name=cam_name)
|
2019-04-09 00:37:26 +03:00
|
|
|
|
2019-05-05 01:07:38 +03:00
|
|
|
self.cams[cam_name]["path"] = self.usb_cameras[cam_name].otherPaths[0] if len(
|
|
|
|
|
self.usb_cameras[cam_name].otherPaths) == 1 else self.usb_cameras[cam_name].otherPaths[1]
|
|
|
|
|
|
|
|
|
|
video_mode: VideoMode = self.usb_cameras[self.usb_cameras[cam_name].name].enumerateVideoModes()[0]
|
|
|
|
|
self.cams[self.usb_cameras[cam_name].name]["video_mode"] = {
|
|
|
|
|
"fps": video_mode.fps,
|
|
|
|
|
"width": video_mode.width,
|
|
|
|
|
"height": video_mode.height,
|
|
|
|
|
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
|
|
|
|
|
}
|
|
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
# Savers
|
|
|
|
|
|
2019-04-20 06:53:06 -07:00
|
|
|
def save_settings(self):
|
|
|
|
|
self._save_general_settings()
|
2019-05-01 10:16:40 -07:00
|
|
|
self._save_cameras()
|
2019-04-20 06:53:06 -07:00
|
|
|
|
|
|
|
|
def _save_cameras(self):
|
2019-05-01 10:16:40 -07:00
|
|
|
|
|
|
|
|
if not os.path.exists(self.cams_path):
|
|
|
|
|
os.mkdir(self.cams_path)
|
|
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
for cam in self.cams:
|
2019-04-28 00:12:02 +03:00
|
|
|
with open(os.path.join(self.cams_path, cam + '.json'), 'w+') as camera:
|
|
|
|
|
json.dump(self.cams[cam], camera)
|
2019-04-09 00:37:26 +03:00
|
|
|
|
2019-04-20 06:53:06 -07:00
|
|
|
def _save_general_settings(self):
|
2019-05-01 10:16:40 -07:00
|
|
|
if not os.path.exists(self.settings_path):
|
|
|
|
|
os.mkdir(self.settings_path)
|
|
|
|
|
|
2019-04-09 00:37:26 +03:00
|
|
|
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
|
2019-04-20 06:53:06 -07:00
|
|
|
json.dump(self.general_settings, setting_file)
|