mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-29 02:21:41 +00:00
Added shit
This commit is contained in:
@@ -4,11 +4,13 @@ from app.ChameleonVisionApp import ChameleonApplication
|
||||
from app.classes.SettingsManager import SettingsManager
|
||||
from tornado.options import options
|
||||
from handlers.CamerasHandler import CamerasHandler
|
||||
from handlers.VisionHandler import VisionHandler
|
||||
|
||||
if __name__ == "__main__":
|
||||
mng = SettingsManager()
|
||||
a = CamerasHandler.get_cameras()
|
||||
b = CamerasHandler.start_cameras(a)
|
||||
a = CamerasHandler.get_cameras_info()
|
||||
b = CamerasHandler.get_or_start_cameras(a)
|
||||
VisionHandler().run()
|
||||
|
||||
tornado.options.parse_command_line()
|
||||
app = ChameleonApplication()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
class PipelineAlreadyExistsException(Excetion):
|
||||
class PipelineAlreadyExistsException(Exception):
|
||||
|
||||
def __init__(self, pipe_name):
|
||||
super(f"Pipeline {pipe_name} already exists")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import os
|
||||
import json
|
||||
import jsonpass
|
||||
from .Singleton import Singleton
|
||||
from handlers.CamerasHandler import CamerasHandler
|
||||
from .Exceptions import PipelineAlreadyExistsException
|
||||
@@ -11,12 +11,15 @@ class SettingsManager(metaclass=Singleton):
|
||||
|
||||
default_pipeline = {
|
||||
"exposure": 50,
|
||||
"brightness": 50
|
||||
"brightness": 50,
|
||||
"hue": [0, 100],
|
||||
"saturation": [0, 100],
|
||||
"value": [0, 100]
|
||||
}
|
||||
default_general_settings = {
|
||||
"team_number": 1577,
|
||||
"curr_camera": "cam1",
|
||||
"curr_pipeline": "pipeline1"
|
||||
"curr_camera": "",
|
||||
"curr_pipeline": ""
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
@@ -25,7 +28,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
self._init_general_settings()
|
||||
self._init_cameras()
|
||||
|
||||
if not self.cams[self.general_settings["curr_camera"]]:
|
||||
if self.general_settings["curr_camera"] not in self.cams and len(list(self.cams.keys())) > 0:
|
||||
self.general_settings["curr_camera"] = list(self.cams.keys())[0]
|
||||
|
||||
def _init_general_settings(self):
|
||||
@@ -36,7 +39,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
self.general_settings = self.default_general_settings.copy()
|
||||
|
||||
def _init_cameras(self):
|
||||
cameras = CamerasHandler.get_cameras()
|
||||
cameras = CamerasHandler.get_cameras_info()
|
||||
|
||||
for cam in cameras:
|
||||
if os.path.exists(os.path.join(self.cams_path, cam.name)):
|
||||
@@ -98,7 +101,7 @@ class SettingsManager(metaclass=Singleton):
|
||||
suffix = 0
|
||||
pipe_name = "pipeline" + str(suffix)
|
||||
|
||||
while pipe_name not in self.cams[cam_name]["pipelines"]:
|
||||
while pipe_name in self.cams[cam_name]["pipelines"]:
|
||||
suffix += 1
|
||||
pipe_name = "pipeline" + str(suffix)
|
||||
elif self.cams[cam_name]["pipelines"][pipe_name]:
|
||||
|
||||
@@ -5,24 +5,55 @@ import cv2
|
||||
class CamerasHandler:
|
||||
|
||||
@staticmethod
|
||||
def get_cameras():
|
||||
arr = []
|
||||
def get_cameras_info():
|
||||
|
||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
if not getattr(CamerasHandler, "cams_info", False):
|
||||
|
||||
for index in range(len(usb_devices)):
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
arr.append(index)
|
||||
cap.release()
|
||||
index += 1
|
||||
return [usb_devices[i] for i in arr]
|
||||
arr = []
|
||||
|
||||
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
|
||||
|
||||
for index in range(len(usb_devices)):
|
||||
cap = cv2.VideoCapture(index)
|
||||
if cap.isOpened():
|
||||
arr.append(index)
|
||||
cap.release()
|
||||
index += 1
|
||||
|
||||
setattr(CamerasHandler, "cams_info", [usb_devices[i] for i in arr])
|
||||
|
||||
return getattr(CamerasHandler, "cams_info")
|
||||
|
||||
@staticmethod
|
||||
def start_cameras(usb_devices):
|
||||
cameras = []
|
||||
for device in usb_devices:
|
||||
camera = cscore.UsbCamera(name='', dev=device.dev)
|
||||
camera.setPixelFormat(pixelFormat=camera.enumerateVideoModes()[0].pixelFormat) #TODO if dictionary is empy do this else take from dictionary
|
||||
cameras.append(camera)
|
||||
return cameras
|
||||
def get_or_start_cameras(usb_devices):
|
||||
|
||||
if not getattr(CamerasHandler, "cams", False):
|
||||
cameras = {}
|
||||
for device in usb_devices:
|
||||
device_name = device.name
|
||||
|
||||
if device.name in cameras:
|
||||
suffix = 0
|
||||
device_name = device.name + str(suffix)
|
||||
|
||||
while device_name in cameras:
|
||||
suffix += 1
|
||||
device_name = "pipeline" + str(suffix)
|
||||
|
||||
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
|
||||
camera.setPixelFormat(pixelFormat=camera.enumerateVideoModes()[0].pixelFormat) #TODO if dictionary is empy do this else take from dictionary
|
||||
|
||||
cameras[device_name] = camera
|
||||
|
||||
setattr(CamerasHandler, "cams", cameras)
|
||||
|
||||
return getattr(CamerasHandler, "cams")
|
||||
|
||||
@staticmethod
|
||||
def get_usb_camera_by_name(cam_name):
|
||||
return CamerasHandler.get_or_start_cameras(CamerasHandler.get_cameras_info())[cam_name]
|
||||
|
||||
@staticmethod
|
||||
def change_camera_values(usb_camera: cscore.UsbCamera, dic):
|
||||
usb_camera.setBrightness(dic["brightness"] or 50)
|
||||
usb_camera.setExposureManual(dic["exposure"] or 50)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import tornado.websocket
|
||||
import json
|
||||
import os
|
||||
from .CamerasHandler import CamerasHandler
|
||||
from ..classes.SettingsManager import SettingsManager
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
self.init_actions()
|
||||
|
||||
def init_actions(self):
|
||||
self.actions["change_pipeline_values"] = self.settings_manager.change_pipeline_values
|
||||
self.actions["change_pipeline_values"] = self.change_pipeline_values
|
||||
self.actions["change_general_settings_values"] = self.settings_manager.change_general_settings_values
|
||||
self.actions["change_cam"] = self.change_curr_camera
|
||||
self.actions["change_pipeline"] = self.change_curr_pipeline
|
||||
@@ -57,4 +57,11 @@ class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
|
||||
def change_curr_pipeline(self, dic):
|
||||
self.settings_manager.set_curr_pipeline(pipe_name=dic["pipeline"])
|
||||
|
||||
def change_pipeline_values(self, dic):
|
||||
self.settings_manager.change_pipeline_values(dic)
|
||||
|
||||
CamerasHandler.change_camera_values(CamerasHandler.get_usb_camera_by_name(self.settings_manager.general_settings["curr_camera"]), dic)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import cscore
|
||||
import networktables
|
||||
from networktables import NetworkTables
|
||||
import cv2
|
||||
import numpy
|
||||
from cscore import CameraServer
|
||||
from app.classes.SettingsManager import SettingsManager
|
||||
|
||||
|
||||
@@ -22,8 +23,7 @@ class VisionHandler:
|
||||
_, contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
return contours
|
||||
|
||||
def filter_contours(self, input_contours, camera_area, min_area, max_area, min_ratio, max_ratio, min_extent,
|
||||
max_extent):
|
||||
def filter_contours(self, input_contours, camera_area, min_area, max_area, min_ratio, max_ratio, min_extent, max_extent):
|
||||
output = []
|
||||
rectangle = []
|
||||
|
||||
@@ -66,20 +66,33 @@ class VisionHandler:
|
||||
|
||||
def run(self):
|
||||
camera_server = cscore.CameraServer.getInstance()
|
||||
networktables.NetworkTables.startClientTeam(team=SettingsManager.general_settings["team_number"])
|
||||
networktables.NetworkTables.initialize()
|
||||
NetworkTables.startClientTeam(team=SettingsManager.general_settings.get("team_number", 1577))
|
||||
NetworkTables.initialize()
|
||||
|
||||
def camera_process(self, camera, stream):
|
||||
def driver_mode_listener(key, value, is_new):
|
||||
raise NotImplementedError()
|
||||
|
||||
def pipeline_listener(key, value, is_new):
|
||||
raise NotImplementedError()
|
||||
|
||||
def camera_process(self,camera, stream,):
|
||||
image = numpy.zeros(shape=(0, 0, 3), dtype=numpy.uint8)
|
||||
table = networktables.NetworkTables.getTable("/Chameleon-Vision/"+camera.getInfo().name)
|
||||
table = NetworkTables.getTable("/Chameleon-Vision/" + camera.getInfo().name)
|
||||
NetworkTables.addEntryListener("Driver_Mode", driver_mode_listener)
|
||||
NetworkTables.addEntryListener("Pipeline", pipeline_listener)
|
||||
|
||||
cv_sink = CameraServer.getInstance().getVideo(camera=camera)
|
||||
|
||||
while True:
|
||||
# _, image = cv_sink.grabFrame(image)
|
||||
_, image = cv_sink.grabFrame(image)
|
||||
hsv_image = self._hsv_threshold()
|
||||
filtered_contours = None
|
||||
|
||||
if table.getBoolean("Driver_Mode", False):
|
||||
hsv_image = self._hsv_threshold()
|
||||
contours = self.find_contours()
|
||||
contours = self.find_contours()
|
||||
image = self.draw_image()
|
||||
|
||||
contours = self.find_contours(hsv_image)
|
||||
filtered_contours = self.filter_contours(contours)
|
||||
|
||||
image = self.draw_image(input_image=hsv_image, is_binary=False, rectangles=filtered_contours)
|
||||
stream.putFrame(image)
|
||||
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
{"exposure": 50, "brightness": 50}
|
||||
@@ -1,5 +0,0 @@
|
||||
{
|
||||
"team_number": 1577,
|
||||
"curr_camera": "cam1",
|
||||
"curr_pipeline": "pipeline1"
|
||||
}
|
||||
Reference in New Issue
Block a user