import cscore from networktables import NetworkTables import networktables import cv2 import numpy from cscore import CameraServer from app.classes.SettingsManager import SettingsManager from ..classes.Singleton import Singleton import time from multiprocessing import Process, Pipe import multiprocessing import threading class VisionHandler(metaclass=Singleton): def __init__(self): self.kernel = numpy.ones((5, 5), numpy.uint8) def _hsv_threshold(self, hue: list, saturation: list, value: list, img: numpy.ndarray, is_erode: bool, is_dilate: bool): # img = cv2.medianBlur(img, 1) # not sure if we need noise reduction now with erode it hurts the precision if val is to high img = cv2.erode(img, kernel=self.kernel, iterations=is_erode) img = cv2.dilate(img, kernel=self.kernel, iterations=is_dilate) out = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) return cv2.inRange(out, (hue[0], saturation[0], value[0]), (hue[1], saturation[1], value[1])) def find_contours(self, binary_img: numpy.ndarray): _, contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) return contours def filter_contours(self, input_contours, camera_area, area, ratio, extent): output = [] rectangle = [] for contour in input_contours: rect = cv2.minAreaRect(contour) # center_point = rect[0] contour_area = cv2.contourArea(contour) rect_area = rect[1][0] * rect[1][1] try: extent_percent = float(contour_area) / rect_area ratio_percent = float(rect[1][0]) / rect[1][1] area_percent = rect_area / camera_area except: continue if area_percent < area[0] or area_percent > area[1]: continue if ratio_percent < ratio[0] or ratio_percent > ratio[1]: continue if extent_percent < extent[0] or extent_percent > extent[1]: continue output.append(contour) rectangle.append(rect) return [output, rectangle] def draw_image(self, input_image: numpy.ndarray, is_binary: bool, rectangles): if is_binary: input_image = cv2.cvtColor(input_image, cv2.COLOR_GRAY2RGB) for rectangle in rectangles[1]: box = cv2.boxPoints(rectangle) box = numpy.int0(box) cv2.drawContours(input_image, [box], 0, (0, 0, 255), 2) center_point = (int(rectangle[0][0]), int(rectangle[0][1])) cv2.circle(input_image, center_point, 0, (0, 255, 0), thickness=3, lineType=8, shift=0) return input_image def run(self): procs = [] # NetworkTables.startClientTeam(team=SettingsManager.general_settings.get("team_number", 1577)) NetworkTables.initialize("localhost") # NetworkTables.initialize() cs = CameraServer.getInstance() pipes = [] for cam_name in SettingsManager().usb_cameras: threading.Thread(target=self.thred_proc, args=(cs,cam_name)).start() def thred_proc(self,cs,cam_name): async def pipe_send(pipe, data): pipe.send(data) async def pipe_recive(pipe): return pipe.recv() cv_sink = cs.getVideo(camera=SettingsManager.usb_cameras[cam_name]) image = numpy.zeros(shape=(SettingsManager().cams[cam_name]["video_mode"]["width"], SettingsManager().cams[cam_name]["video_mode"]["height"], 3), dtype=numpy.uint8) cv_publish = cs.putVideo(name=cam_name, width=SettingsManager().cams[cam_name]["video_mode"]["width"], height=SettingsManager().cams[cam_name]["video_mode"]["height"]) parent,child = Pipe() Process(target=self.camera_process, args=(SettingsManager.usb_cameras[cam_name], cam_name, child)).start() while True: start = time.time() _, image = cv_sink.grabFrame(image) parent.send(image) # pipe_send(parent,image) image = parent.recv() cv_publish.putFrame(image) end = time.time() print(cam_name + " " + str(1 / (end - start))) def camera_process(self, camera, cam_name, child_pipe): curr_pipline = list(SettingsManager.cams[cam_name]["pipelines"].values())[0] # def change_camera_values(): # camera.setBrightness(0) # camera.setExposureManual(0) # # def pipeline_listener(table, key, value, is_new): # if (is_new): # curr_pipline = SettingsManager.cams[cam_name]["pipelines"][value] # change_camera_values() # # def mode_listener(table, key, value, is_new): # pass # # table = NetworkTables.getTable("/Chameleon-Vision/" + camera.getInfo().name) # # table.addEntryListenerEx(pipeline_listener, key="Pipeline", # flags=networktables.NetworkTablesInstance.NotifyFlags.UPDATE) # table.addEntryListenerEx(mode_listener, key="Driver_Mode", # flags=networktables.NetworkTablesInstance.NotifyFlags.UPDATE) # change_camera_values() cam_area = SettingsManager().cams[cam_name]["video_mode"]["width"] * \ SettingsManager().cams[cam_name]["video_mode"]["height"] while True: image = child_pipe.recv() hsv_image = self._hsv_threshold(curr_pipline["hue"], curr_pipline["saturation"], curr_pipline["value"], image, curr_pipline["erode"], curr_pipline["dilate"]) # if table.getBoolean("Driver_Mode", False): contours = self.find_contours(hsv_image) filtered_contours = self.filter_contours(contours, cam_area, curr_pipline["area"], curr_pipline["ratio"], curr_pipline["extent"]) image = self.draw_image(input_image=image, is_binary=False, rectangles=filtered_contours) child_pipe.send(image)