cleanup old code

This commit is contained in:
ori agranat
2019-09-24 19:50:49 +03:00
parent 8dbf5b3a73
commit e8dc3ed3ac
39 changed files with 0 additions and 6141 deletions

View File

@@ -1,36 +0,0 @@
from networktables import NetworkTables
import tornado.ioloop
import logging
from app.ChameleonVisionApp import ChameleonApplication
from app.classes.SettingsManager import SettingsManager
from tornado.options import options
import threading
import asyncio
from app.handlers.CameraHander import CameraHandler
def run_server():
asyncio.set_event_loop(asyncio.new_event_loop())
tornado.options.parse_command_line()
app = ChameleonApplication()
print(f"Serving on port {options.port}")
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def run():
NetworkTables.startClientTeam(team=settings_manager.general_settings.get("team_number", 1577))
port = 5550
for cam_name in settings_manager.usb_cameras:
CameraHandler(cam_name, port).run()
port += 1
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
settings_manager = SettingsManager()
run()
server_thread = threading.Thread(target=run_server)
server_thread.start()

View File

View File

@@ -1,27 +0,0 @@
import tornado.web
import tornado.websocket
import os
from .handlers.MainHandler import MainHandler
from .handlers.SocketHandler import ChameleonWebSocket
from tornado.options import define
define("port", default=8888, help="run on the given port", type=int)
class ChameleonApplication(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/websocket", ChameleonWebSocket),
(r"/(.*)", tornado.web.StaticFileHandler,
{"path": r"{0}".format(os.path.join(os.path.dirname(__file__), "site"))}),
]
settings = dict({
"template_path": os.path.join(os.path.dirname(__file__), "site"),
"static_path": os.path.join(os.path.dirname(__file__), "site"),
"debug": True
}
)
super(ChameleonApplication, self).__init__(handlers, **settings)

View File

@@ -1,10 +0,0 @@
class PipelineAlreadyExistsException(Exception):
def __init__(self, pipe_name):
super(f"Pipeline {pipe_name} already exists")
class NoCameraConnectedException(Exception):
def __init__(self):
super("No camera as been detected")

View File

@@ -1,263 +0,0 @@
import os
import json
import cv2
import cscore
import subprocess
from cscore._cscore import VideoMode
from .Singleton import Singleton
from .Exceptions import PipelineAlreadyExistsException, NoCameraConnectedException
from ..handlers.IPHandler import ChangeIP
class SettingsManager(metaclass=Singleton):
cams = {}
usb_cameras = {}
usb_cameras_info = {}
general_settings = {}
cams_port = {}
cams_curr_pipeline = {}
default_pipeline = {
"exposure": 50,
"brightness": 50,
"orientation": "Normal",
"hue": [0, 100],
"saturation": [0, 100],
"value": [0, 100],
"erode": False,
"dilate": False,
"area": [0, 100],
"ratio": [0, 20],
"extent": [0, 100],
"is_binary": 0,
"sort_mode": "Largest",
"target_group": 'Single',
"target_intersection": 'Up',
"M": 1,
"B": 0
}
default_general_settings = {
"team_number": 1577,
"connection_type": "DHCP",
"ip": "",
"gateway": "",
"netmask": "",
"hostname": "Chameleon-Vision",
"curr_camera": "",
"curr_pipeline": ""
}
def __init__(self):
self.settings_path = os.path.join(os.getcwd(), "settings")
self.cams_path = os.path.join(self.settings_path, "cams")
self._init_general_settings()
ChangeIP(connection_type=self.general_settings['connection_type'], hostname=self.general_settings['hostname'],
ip=self.general_settings['ip'],
netmask=self.general_settings['netmask'], gateway=self.general_settings['gateway'])
self._init_cameras_info()
self._init_usb_cameras()
self._init_cameras()
self._init_usb_cameras_settings()
if self.general_settings["curr_camera"] not in self.cams:
if len(self.cams) > 0:
cam_name = list(self.cams.keys())[0]
self.general_settings["curr_camera"] = cam_name
self.general_settings["curr_pipeline"] = list(self.cams[cam_name]["pipelines"].keys())[0]
else:
self.general_settings["curr_camera"] = ""
self.general_settings["curr_pipeline"] = ""
def _init_general_settings(self):
try:
with open(os.path.join(self.settings_path, 'settings.json')) as setting_file:
self.general_settings = json.load(setting_file)
except FileNotFoundError:
self.general_settings = self.default_general_settings.copy()
# Initiate our camera's settings
def _init_cameras(self):
for cam_name in self.usb_cameras_info:
if os.path.exists(os.path.join(self.cams_path, cam_name + '.json')):
with open(os.path.join(self.cams_path, cam_name + '.json'), 'r') as camera:
self.cams[cam_name] = json.load(camera)
if len(self.cams[cam_name]["pipelines"]) == 0:
self.create_new_pipeline(cam_name=cam_name)
else:
self.create_new_cam(cam_name)
# Initiate true usb cameras(filters microphones and double cameras)
def _init_cameras_info(self):
true_cameras = []
usb_devices = cscore.UsbCamera.enumerateUsbCameras()
for index, device in enumerate(usb_devices):
cap = cv2.VideoCapture(device.dev)
if cap.isOpened():
true_cameras.append(index)
cap.release()
for i in true_cameras:
device_name = usb_devices[i].name
suffix = 0
while device_name in self.usb_cameras_info:
suffix += 1
device_name = f"{device.name}({str(suffix)})"
self.usb_cameras_info[device_name] = usb_devices[i]
# Initiate cscore usb devices
def _init_usb_cameras(self):
for device_name in self.usb_cameras_info:
device = self.usb_cameras_info[device_name]
camera = cscore.UsbCamera(name=device_name, dev=device.dev)
self.usb_cameras[device_name] = camera
def _init_usb_cameras_settings(self):
for cam_name in self.usb_cameras:
self.usb_cameras[cam_name].setPixelFormat(pixelFormat=getattr(VideoMode.PixelFormat, self.cams[cam_name]["video_mode"]["pixel_format"]))
self.usb_cameras[cam_name].setFPS(self.cams[cam_name]["video_mode"]["fps"])
self.usb_cameras[cam_name].setResolution(width=self.cams[cam_name]["video_mode"]["width"], height=self.cams[cam_name]["video_mode"]["height"])
# Change usb camera settings
def set_camera_settings(self, camera_name, dic):
if "brightness" in dic:
self.usb_cameras[camera_name].setBrightness(dic["brightness"])
if "exposure" in dic:
self.usb_cameras[camera_name].setExposureManual(dic["exposure"])
if "resolution" in dic:
video_mode: VideoMode = self.usb_cameras[camera_name].enumerateVideoModes()[int(dic["resolution"])]
self.cams[camera_name]["video_mode"] = {
"fps": video_mode.fps,
"width": video_mode.width,
"height": video_mode.height,
"pixel_format": str(video_mode.pixelFormat).split('.')[1]
}
self.usb_cameras[camera_name].setVideoMode(self.usb_cameras[camera_name].enumerateVideoModes()[int(dic["resolution"])])
if "FOV" in dic:
self.cams[camera_name]["FOV"] = float(dic["FOV"])
# Access methods
def get_curr_pipeline(self):
if self.general_settings["curr_pipeline"]:
return self.cams[self.general_settings["curr_camera"]]["pipelines"][self.general_settings["curr_pipeline"]]
raise NoCameraConnectedException()
def get_resolution_list(self):
if self.general_settings["curr_camera"]:
str_list = []
for val in self.usb_cameras[self.general_settings["curr_camera"]].enumerateVideoModes():
str_list.append("{width} X {height} at {fps} fps".format(width=str(val.width),
height=str(val.height), fps=str(val.fps)))
return str_list
raise NoCameraConnectedException()
def get_curr_cam(self):
if self.general_settings["curr_camera"]:
return self.cams[self.general_settings["curr_camera"]]
raise NoCameraConnectedException()
def set_curr_camera(self, cam_name):
if cam_name in self.cams:
self.general_settings["curr_camera"] = cam_name
self.general_settings["curr_pipeline"] = list(self.get_curr_cam()["pipelines"].keys())[0]
def set_curr_pipeline(self, pipe_name):
if pipe_name in self.get_curr_cam()["pipelines"]:
self.general_settings["curr_pipeline"] = pipe_name
def change_pipeline_values(self, dic, cam_name=None, pipe_name=None):
if not cam_name:
cam_name = self.general_settings["curr_camera"]
if not pipe_name:
pipe_name = self.general_settings["curr_pipeline"]
for key in dic:
if key in self.default_pipeline:
self.cams[cam_name]["pipelines"][pipe_name][key] = dic[key]
def change_general_settings_values(self, dic):
for key in dic['change_general_settings_values']:
if key in self.default_general_settings.keys():
self.general_settings[key] = dic['change_general_settings_values'][key]
self.save_settings()
subprocess.call(['reboot'])
# after all values has been set change settings
# Creators
def create_new_pipeline(self, pipe_name=None, cam_name=None):
if not cam_name:
cam_name = self.general_settings["curr_camera"]
if not pipe_name:
suffix = 0
pipe_name = "pipeline" + str(suffix)
while pipe_name in self.cams[cam_name]["pipelines"]:
suffix += 1
pipe_name = "pipeline" + str(suffix)
elif self.cams[cam_name]["pipelines"][pipe_name]:
raise PipelineAlreadyExistsException(pipe_name)
self.cams[cam_name]["pipelines"][pipe_name] = self.default_pipeline.copy()
def create_new_cam(self, cam_name):
self.cams[cam_name] = {}
self.cams[cam_name]["pipelines"] = {}
for i in range(10):
self.create_new_pipeline(cam_name=cam_name)
self.cams[cam_name]["path"] = self.usb_cameras_info[cam_name].otherPaths[0] if len(
self.usb_cameras_info[cam_name].otherPaths) == 1 else self.usb_cameras_info[cam_name].otherPaths[1]
video_mode: VideoMode = self.usb_cameras[cam_name].enumerateVideoModes()[0]
self.cams[cam_name]["video_mode"] = {
"fps": video_mode.fps,
"width": video_mode.width,
"height": video_mode.height,
"pixel_format": str(video_mode.pixelFormat).split('.')[1],
}
self.cams[cam_name]['resolution'] = 0
self.cams[cam_name]["FOV"] = 60.8
# Savers
def save_settings(self):
self._save_general_settings()
self._save_cameras()
def _save_cameras(self):
if not os.path.exists(self.cams_path):
os.mkdir(self.cams_path)
for cam in self.cams:
with open(os.path.join(self.cams_path, cam + '.json'), 'w+') as camera:
json.dump(self.cams[cam], camera)
def _save_general_settings(self):
if not os.path.exists(self.settings_path):
os.mkdir(self.settings_path)
with open(os.path.join(self.settings_path, 'settings.json'), 'w+') as setting_file:
json.dump(self.general_settings, setting_file)

View File

@@ -1,8 +0,0 @@
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]

View File

@@ -1,252 +0,0 @@
import math
import cv2
import numpy
from cscore import CameraServer
from app.classes.SettingsManager import SettingsManager
from ..handlers.SocketHandler import send_all_async
from multiprocessing import Process
import threading
import zmq
import asyncio
import time
from networktables import NetworkTables
import networktables
from .VisionHandler import VisionHandler
class CameraHandler:
def __init__(self, cam_name, port):
#settings vars up for vision loop
self.cs = CameraServer.getInstance()
self.settings_manager = SettingsManager()
self.vision_handler = VisionHandler()
self.port = port
self.cam_name = cam_name
self.image = None
self.p_image = None
self.table = None
self.nt_data = {'valid': False}
self.time_stamp = 0
def run(self):
#starting main thread
threading.Thread(target=self.thread_proc).start()
def thread_proc(self):
self.settings_manager.cams_curr_pipeline[self.cam_name] = "pipeline0"
pipeline = self.settings_manager.cams[self.cam_name]["pipelines"][self.settings_manager.cams_curr_pipeline[self.cam_name]]
FOV = self.settings_manager.cams[self.cam_name]["FOV"]
def change_camera_values(pipeline):
self.settings_manager.usb_cameras[self.cam_name].setBrightness(pipeline['brightness'])
self.settings_manager.usb_cameras[self.cam_name].setExposureManual(pipeline['exposure'])
self.settings_manager.usb_cameras[self.cam_name].setWhiteBalanceAuto()
def pipeline_listener(table, key, value, is_new):
asyncio.set_event_loop(asyncio.new_event_loop())
if value in self.settings_manager.cams[self.cam_name]['pipelines'].keys():
self.settings_manager.cams_curr_pipeline[self.cam_name] = value
change_camera_values(pipeline)
if self.cam_name == self.settings_manager.general_settings['curr_camera']:
self.settings_manager.general_settings['curr_pipeline'] = value
update_settings = self.settings_manager.get_curr_pipeline()
update_settings['curr_pipeline'] = self.settings_manager.general_settings["curr_pipeline"]
send_all_async(update_settings)
else:
self.table.putString('Pipeline', self.settings_manager.cams_curr_pipeline[self.cam_name])
def mode_listener(table, key, value, is_new):
if value:
change_camera_values({
'brightness': 25,
'exposure': 15
})
else:
change_camera_values(pipeline)
#setting up network table
self.table = NetworkTables.getTable("/Chameleon-Vision/" + self.cam_name)
#init values for pipeline and driver mode
self.table.putString('Pipeline', self.settings_manager.cams_curr_pipeline[self.cam_name])
self.table.putBoolean('Driver_Mode', False)
self.table.addEntryListenerEx(pipeline_listener, key="Pipeline",
flags=networktables.NetworkTablesInstance.NotifyFlags.UPDATE)
self.table.addEntryListenerEx(mode_listener, key="Driver_Mode",
flags=networktables.NetworkTablesInstance.NotifyFlags.UPDATE)
# getting video from current camera
cv_sink = self.cs.getVideo(camera=self.settings_manager.usb_cameras[self.cam_name])
width = self.settings_manager.cams[self.cam_name]["video_mode"]["width"]
height = self.settings_manager.cams[self.cam_name]["video_mode"]["height"]
# setting up a video server for camera
cv_publish = self.cs.putVideo(name=self.cam_name, width=width, height=height)
# saving camera port in cam name dict for usage in client
self.settings_manager.cams_port[self.cam_name] = self.cs._sinks['serve_' + self.cam_name].getPort()
# setting up a zmq connection to the opencv subprocess
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind('tcp://*:%s' % str(self.port))
# starting the process with initial values
p = Process(target=self.camera_process, args=(self.cam_name, self.port, FOV))
p.start()
change_camera_values(pipeline)
def _publish_thread():
#getting image values and publishing process image and data
self.image = numpy.zeros(shape=(width, height, 3), dtype=numpy.uint8)
self.p_image = self.image
while True:
try:
self.time_stamp, self.image = cv_sink.grabFrame(self.image)
cv_publish.putFrame(self.p_image)
self.table.putBoolean('valid', self.nt_data['valid'])
# check if point is valid
if self.nt_data['valid']:
# send the point using network tables
self.table.putNumber('pitch', self.nt_data['pitch'])
self.table.putNumber('yaw', self.nt_data['yaw'])
self.table.putNumber('time_stamp', self.nt_data['time_stamp'])
self.table.putNumber('fps', self.nt_data['fps'])
# if the selected camera in ui is this cam send the point to the ui
except:
pass
def _socket_thread():
#publishing to websocket at slower interval
asyncio.set_event_loop(asyncio.new_event_loop())
while True:
time.sleep(0.1)
if self.settings_manager.general_settings['curr_camera'] == self.cam_name:
try:
send_all_async({
'raw_point': self.nt_data['raw_point'],
'point': {
'pitch': self.nt_data['pitch'],
'yaw': self.nt_data['yaw'],
'fps': self.nt_data['fps']
}
})
except:
pass
threading.Thread(target=_publish_thread).start()
threading.Thread(target=_socket_thread).start()
while True:
#sending and reciving data from opencv sub process
pipeline = self.settings_manager.cams[self.cam_name]["pipelines"][
self.settings_manager.cams_curr_pipeline[self.cam_name]]
socket.send_json(dict(
pipeline=pipeline,
driver_mode=self.table.getBoolean('Driver_Mode', False)
), zmq.SNDMORE)
socket.send_pyobj((self.time_stamp,self.image))
self.p_image = socket.recv_pyobj()
self.nt_data = socket.recv_json()
def camera_process(self, cam_name, port, FOV):
from fractions import Fraction
#calc fov
diagonalView = math.radians(FOV)
width = self.settings_manager.cams[cam_name]["video_mode"]["width"]
height = self.settings_manager.cams[cam_name]["video_mode"]["height"]
centerX = (width / 2) - .5
centerY = (height / 2) - .5
cam_area = width * height
aspect_fraction = Fraction(width, height)
horizontal_ratio = aspect_fraction.numerator
vertical_ratio = aspect_fraction.denominator
horizontalView = math.atan(math.tan(diagonalView / 2) * (horizontal_ratio / diagonalView)) * 2
verticalView = math.atan(math.tan(diagonalView / 2) * (vertical_ratio / diagonalView)) * 2
H_FOCAL_LENGTH = width / (2 * math.tan((horizontalView / 2)))
V_FOCAL_LENGTH = height / (2 * math.tan((verticalView / 2)))
#setting up zmq socket
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect('tcp://localhost:%s' % str(port))
#setting up filter countours class
filter_contours = self.vision_handler.Filter_Contours(center_x=centerX, center_y=centerY)
x = 1
counter = 0
start_time = time.time()
fps = 0
while True:
obj = socket.recv_json()
curr_pipeline = obj['pipeline']
driver_mode = obj['driver_mode']
time_stamp, image = socket.recv_pyobj()
if curr_pipeline['orientation'] == "Inverted":
M = cv2.getRotationMatrix2D((width / 2, height / 2), 180, 1)
image = cv2.warpAffine(image, M, (width, height))
if not driver_mode:
hsv_image = self.vision_handler._hsv_threshold(curr_pipeline["hue"],
curr_pipeline["saturation"], curr_pipeline["value"],
image, curr_pipeline["erode"], curr_pipeline["dilate"])
# if table.getBoolean("Driver_Mode", False):
contours = self.vision_handler.find_contours(hsv_image)
filtered_contours = filter_contours.filter_contours(input_contours=contours, area=curr_pipeline['area'],
ratio=curr_pipeline['ratio'],
extent=curr_pipeline['extent'],
sort_mode=curr_pipeline['sort_mode'], cam_area=cam_area,
target_grouping=curr_pipeline['target_group'],
target_intersection=
curr_pipeline['target_intersection'])
final_contour = self.vision_handler.output_contour(filtered_contours)
try:
center = final_contour[0]
if curr_pipeline["M"] == 1 and curr_pipeline["B"] == 0:
center_x = centerX
center_y = centerY
else:
center_x = (center[1] - curr_pipeline['B']) / curr_pipeline["M"]
center_y = (center[0] * curr_pipeline["M"]) + curr_pipeline["B"]
pitch = self.vision_handler.calculate_pitch(pixel_y=center[1], center_y=center_y, v_focal_length=V_FOCAL_LENGTH)
yaw = self.vision_handler.calculate_yaw(pixel_x=center[0], center_x=center_x, h_focal_length=H_FOCAL_LENGTH)
valid = True
except IndexError:
center = None
pitch = None
yaw = None
valid = False
if curr_pipeline['is_binary']:
draw_image = hsv_image
else:
draw_image = image
res = self.vision_handler.draw_image(input_image=draw_image, contour=final_contour)
else:
res = image
center = None
pitch = None
yaw = None
valid = False
socket.send_pyobj(res)
socket.send_json(dict(
pitch=pitch,
yaw=yaw,
valid=valid,
raw_point=center,
fps=fps,
time_stamp=time_stamp
))
counter += 1
if (time.time() - start_time) > x:
fps = (counter / (time.time() - start_time))
counter = 0
start_time = time.time()

View File

@@ -1,49 +0,0 @@
import subprocess
import netifaces
class ChangeIP:
def __init__(self, connection_type, ip, netmask, gateway, hostname):
adapter = self.find_adapter()
if adapter is not None:
self.shutdown_adapter(adapter)
if connection_type == "DHCP":
self.change_to_dhcp(adapter=adapter)
elif connection_type == "Static":
self.change_to_static(adapter=adapter, ip=ip, netmask=netmask, gateway=gateway)
self.start_adapter(adapter)
else:
print("not connected to robot radio cannot set ip")
self.change_hostname(hostname=hostname)
@staticmethod
def change_to_dhcp(adapter):
subprocess.call(['dhclient',"-r", adapter])
@staticmethod
def change_to_static(adapter, ip, netmask, gateway):
subprocess.call(['ifconfig', adapter, ip, 'netmask', netmask])
subprocess.call(['route', 'add', 'default', 'gw', gateway, adapter])
@staticmethod
def shutdown_adapter(adapter):
subprocess.call(['ifconfig', adapter, 'down'])
@staticmethod
def start_adapter(adapter):
subprocess.call(['ifconfig', adapter, 'up'])
@staticmethod
def find_adapter():
for i_name in netifaces.interfaces():
interface = netifaces.ifaddresses(i_name)[netifaces.AF_INET][0]
address = interface['addr'].split('.')[0]
if address == "10":
return str(i_name)
@staticmethod
def change_hostname(hostname):
subprocess.call(['hostnamectl', 'set-hostname', hostname])

View File

@@ -1,6 +0,0 @@
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")

View File

@@ -1,123 +0,0 @@
import tornado.websocket
import json
from ..classes.Exceptions import NoCameraConnectedException
from ..classes.SettingsManager import SettingsManager
web_socket_clients = set()
def send_all_async(message):
for ws in web_socket_clients:
try:
ws.write_message(json.dumps(message))
except AssertionError as a:
pass
class ChameleonWebSocket(tornado.websocket.WebSocketHandler):
actions = {}
set_this_camera_settings = ["exposure", "brightness"]
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.settings_manager = SettingsManager()
self.init_actions()
def init_actions(self):
self.actions["change_pipeline_values"] = self.change_pipeline_values
self.actions["change_general_settings_values"] = self.settings_manager.change_general_settings_values
self.actions["curr_camera"] = self.change_curr_camera
self.actions["curr_pipeline"] = self.change_curr_pipeline
self.actions['resolution'] = self.set_resolution
self.actions['FOV'] = self.set_fov
def open(self):
self.send_full_settings()
if self not in web_socket_clients:
web_socket_clients.add(self)
print("WebSocket opened")
def on_message(self, message):
try:
message_dic = json.loads(message)
for key in message_dic:
self.actions.get(key, self.actions["change_pipeline_values"])(message_dic)
print(message)
except Exception as e:
print("crash " + e)
def on_close(self):
self.settings_manager.save_settings()
if self in web_socket_clients:
web_socket_clients.remove(self)
print("WebSocket closed")
def check_origin(self, origin):
return True
def set_resolution(self, message):
self.settings_manager.get_curr_cam()['resolution'] = message['resolution']
SettingsManager().set_camera_settings(camera_name=SettingsManager().general_settings['curr_camera'],
dic=message)
self.settings_manager.save_settings()
def set_fov(self, message):
self.settings_manager.get_curr_cam()['FOV'] = message['FOV']
self.settings_manager.save_settings()
def send_curr_pipeline(self):
try:
self.write_message(self.settings_manager.get_curr_pipeline())
except NoCameraConnectedException:
# TODO: return something if no camera connected
self.write_message("No camera connected")
def send_curr_cam(self):
try:
self.write_message(self.settings_manager.get_curr_cam())
except NoCameraConnectedException:
# TODO: return something if no camera connected
self.write_message("No camera connected")
def send_curr_port(self):
self.write_message({
'port': self.settings_manager.cams_port[self.settings_manager.general_settings["curr_camera"]]
})
def send_full_settings(self):
full_settings = self.settings_manager.general_settings.copy()
full_settings["cameraList"] = list(self.settings_manager.cams.copy().keys())
try:
full_settings.update(self.settings_manager.get_curr_pipeline())
full_settings["pipelineList"] = list(self.settings_manager.cams[self.settings_manager.general_settings["curr_camera"]]["pipelines"].keys())
full_settings["resolutionList"] = self.settings_manager.get_resolution_list()
full_settings['resolution'] = self.settings_manager.get_curr_cam()['resolution']
full_settings['FOV'] = self.settings_manager.get_curr_cam()['FOV']
full_settings['port'] = self.settings_manager.cams_port[self.settings_manager.general_settings["curr_camera"]]
except NoCameraConnectedException:
# TODO: return something if no camera connected
full_settings["data"] = None
self.write_message(full_settings)
def change_curr_camera(self, dic):
self.settings_manager.set_curr_camera(cam_name=dic["curr_camera"])
self.send_curr_port()
self.send_curr_cam()
def change_curr_pipeline(self, dic):
self.settings_manager.set_curr_pipeline(pipe_name=dic["curr_pipeline"])
self.settings_manager.cams_curr_pipeline[self.settings_manager.general_settings['curr_camera']] = dic["curr_pipeline"]
self.send_curr_pipeline()
def change_pipeline_values(self, dic):
self.settings_manager.change_pipeline_values(dic)
for key in self.set_this_camera_settings:
if key in dic:
self.settings_manager.set_camera_settings(self.settings_manager.general_settings["curr_camera"],
dic)

View File

@@ -1,221 +0,0 @@
import cv2
import numpy
import math
from enum import Enum, unique
class VisionHandler():
def __init__(self):
self.kernel = numpy.ones((5, 5), numpy.uint8)
def _hsv_threshold(self, hue: list, saturation: list, value: list, img: numpy.ndarray, is_erode: bool,
is_dilate: bool):
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
thresh = cv2.inRange(hsv, (hue[0], saturation[0], value[0]), (hue[1], saturation[1], value[1]))
erode_img = cv2.erode(thresh, kernel=self.kernel, iterations=is_erode)
dilate_img = cv2.dilate(erode_img, kernel=self.kernel, iterations=is_dilate)
return dilate_img
def find_contours(self, binary_img: numpy.ndarray):
_,contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
return contours
class Filter_Contours:
def __init__(self,center_x, center_y):
self.sort_mode = self.SortMode(center_x=center_x, center_y=center_y)
self.center_y = center_y
self.center_x = center_x
class SortMode:
def __init__(self, center_x, center_y):
self.center_x = center_x
self.center_y = center_y
@classmethod
def moment_x(cls,contour):
M = cv2.moments(contour)
try:
x = float(M['m10'] / M['m00'])
except ZeroDivisionError:
x = 0
return x
@classmethod
def moment_y(cls, contour):
M = cv2.moments(contour)
try:
y = float(M['m01'] / M['m00'])
except ZeroDivisionError:
y = 0
return y
@classmethod
def calc_distance(cls,contour, center_x, center_y):
M = cv2.moments(contour)
try:
x = int(M['m10'] / M['m00'])
except ZeroDivisionError:
x = 0
try:
y = int(M['m01'] / M['m00'])
except ZeroDivisionError:
y = 0
# this function was suggested by my girlfriend maya jugend that i really love
return math.sqrt((center_x-x)**2 + (center_y-y)**2)
def Largest(self, input_contours):
return sorted(input_contours, key=lambda x: cv2.contourArea(x), reverse=True)
def Smallest(self, input_contours):
return sorted(input_contours, key=lambda x: cv2.contourArea(x))
def Highest(self, input_contours):
return sorted(input_contours, key=lambda x: self.moment_y(x))
def Lowest(self, input_contours):
return sorted(input_contours, key=lambda x: self.moment_y(x),reverse=True)
def Rightmost(self, input_contours):
return sorted(input_contours, key=lambda x: self.moment_x(x), reverse=True)
def Leftmost(self, input_contours):
return sorted(input_contours, key=lambda x: self.moment_x(x))
def Closest(self, input_contours):
return sorted(input_contours, key=lambda x: self.calc_distance(x, center_x=self.center_x,
center_y=self.center_y), reverse=True)
def filter_contours(self, input_contours, cam_area, area, ratio, extent, sort_mode, target_grouping,
target_intersection):
class TargetGroup(Enum):
Single = 1
Dual = 2
Triple = 3
Quadruple = 4
Quintuple = 6
def group_target(i_contours, target_group, intersection_point):
def is_intersecting(contour_a, contour_b, intersection_direction):
[vx_a, vy_a, x0_a, y0_a] = cv2.fitLine(contour_a, cv2.DIST_L2, 0, 0.01, 0.01)
[vx_b, vy_b, x0_b, y0_b] = cv2.fitLine(contour_b, cv2.DIST_L2, 0, 0.01, 0.01)
# getting line data of both contours
m_a = vy_a / vx_a
m_b = vy_b / vx_b
# calculating slope of both lines
try:
intersection_x = ((m_a * x0_a) - y0_a - (m_b * x0_b) + y0_b) / (m_a - m_b)
except ZeroDivisionError:
if intersection_direction == 'Parallel':
return True
else:
return False
intersection_y = (m_a * (intersection_x - x0_a)) + y0_a
# finding intersection point
if intersection_direction == 'Up':
if intersection_y < self.center_y:
return True
elif intersection_direction == 'Down':
if intersection_y > self.center_y:
return True
elif intersection_direction == 'Left':
if intersection_x < self.center_x:
return True
elif intersection_direction == 'Right':
if intersection_x > self.center_x:
return True
else:
return False
if target_group != TargetGroup.Single:
f_contour_list = []
for index, g_contour in enumerate(i_contours):
final_contour = g_contour
for c in range(target_group.value - 1):
try:
first_contour = i_contours[index + c]
second_contour = i_contours[index + c + 1]
except IndexError:
final_contour = []
break
if is_intersecting(first_contour, second_contour, intersection_point):
final_contour = numpy.concatenate((final_contour, second_contour))
else:
final_contour = []
break
if final_contour != []:
f_contour_list.append(final_contour)
return f_contour_list
else:
return i_contours
'''start of the first filtration of contours'''
filtered_contours = []
for contour in input_contours:
try:
contour_area = cv2.contourArea(contour)
target_area = float(contour_area / cam_area)*100
if target_area >= area[1] or target_area <= area[0]:
continue
rect = cv2.minAreaRect(contour)
bounding_rect_area = rect[1][0] * rect[1][1]
try:
target_fullness = float(contour_area / bounding_rect_area)*100
except ZeroDivisionError:
target_fullness = 0
if target_fullness <= extent[0] or target_fullness >= extent[1]:
continue
try:
aspect_ratio = float(rect[1][0]/rect[1][1])
except ZeroDivisionError:
aspect_ratio = 0
if aspect_ratio <= ratio[0] or aspect_ratio >= ratio[1]:
continue
filtered_contours.append(contour)
except Exception as e:
print(e)
continue
#checking for contour grouping before sorting
grouped_contours = group_target(filtered_contours, TargetGroup[target_grouping], target_intersection)
try:
sorted_contours = getattr(self.sort_mode, sort_mode)(grouped_contours)
except TypeError:
sorted_contours = []
return sorted_contours
def output_contour(self, sorted_contours):
if len(sorted_contours) > 0:
selected_contour = sorted_contours[0]
rect = cv2.minAreaRect(selected_contour)
else:
return []
return rect
def draw_image(self, input_image, contour):
if len(input_image.shape)<3:
input_image = cv2.cvtColor(input_image, cv2.COLOR_GRAY2RGB)
if contour != []:
box = cv2.boxPoints(contour)
box = numpy.int0(box)
cv2.drawContours(input_image, [box], 0, (0, 0, 255), 3)
# center_point = (int(rectangle[0][0]), int(rectangle[0][1]))
# cv2.circle(input_image, center_point, 0, (0, 255, 0), thickness=3, lineType=8, shift=0)
return input_image
def calculate_pitch(self, pixel_y, center_y, v_focal_length):
pitch = math.degrees(math.atan((pixel_y - center_y) / v_focal_length))
pitch *= -1
return pitch
def calculate_yaw(self, pixel_x, center_x, h_focal_length):
yaw = math.degrees(math.atan((pixel_x - center_x) / h_focal_length))
return yaw

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 542 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

View File

@@ -1 +0,0 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.png><title>Chameleon Vision</title><link href=/css/app.8be123c7.css rel=preload as=style><link href=/js/app.fd9292a1.js rel=preload as=script><link href=/js/chunk-vendors.a3ecb371.js rel=preload as=script><link href=/css/app.8be123c7.css rel=stylesheet></head><body><noscript><strong>We're sorry but Chameleon Vision doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.a3ecb371.js></script><script src=/js/app.fd9292a1.js></script></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +0,0 @@
tornado
pyzmq
robotpy-cscore
netifaces

View File

@@ -1,10 +0,0 @@
#!/bin/bash
apt-get update
apt-get dist-upgrade
apt-get upgrade
apt-get install python3-pip python3-dev cmake zip unzip build-essential git libnss-mdns --fix-missing
apt-get install python3-numpy
apt-get install python3-opencv
pip3 install robotpy-cscore
pip3 install pyzmq
pip3 install tornado

4088
logo.ai

File diff suppressed because one or more lines are too long

96
package-lock.json generated
View File

@@ -1,96 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-validator": {
"version": "1.10.1",
"resolved": "https://registry.npmjs.org/async-validator/-/async-validator-1.10.1.tgz",
"integrity": "sha512-VLiLKZuJc8VIeAMC3YobVsZov8XPNhbwyIkKjhPW5cFnhZXH+HHJpkE270YMD/6zJIOJXUN/Cq0t3fR7XPwaDQ==",
"requires": {
"babel-runtime": "6.x"
}
},
"babel-runtime": {
"version": "6.26.0",
"resolved": "https://registry.npmjs.org/babel-runtime/-/babel-runtime-6.26.0.tgz",
"integrity": "sha1-llxwWGaOgrVde/4E/yM3vItWR/4=",
"requires": {
"core-js": "^2.4.0",
"regenerator-runtime": "^0.11.0"
}
},
"batch-processor": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/batch-processor/-/batch-processor-1.0.0.tgz",
"integrity": "sha1-dclcMrdI4IUNEMKxaPa9vpiRrOg="
},
"core-js": {
"version": "2.6.5",
"resolved": "https://registry.npmjs.org/core-js/-/core-js-2.6.5.tgz",
"integrity": "sha512-klh/kDpwX8hryYL14M9w/xei6vrv6sE8gTHDG7/T/+SEovB/G4ejwcfE/CBzO6Edsu+OETZMZ3wcX/EjUkrl5A=="
},
"deepmerge": {
"version": "2.2.1",
"resolved": "https://registry.npmjs.org/deepmerge/-/deepmerge-2.2.1.tgz",
"integrity": "sha512-R9hc1Xa/NOBi9WRVUWg19rl1UB7Tt4kuPd+thNJgFZoxXsTz7ncaPaeIm+40oSGuP33DfMb4sZt1QIGiJzC4EA=="
},
"element-resize-detector": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/element-resize-detector/-/element-resize-detector-1.2.0.tgz",
"integrity": "sha512-UmhNB8sIJVZeg56gEjgmMd6p37sCg8j8trVW0LZM7Wzv+kxQ5CnRHcgRKBTB/kFUSn3e7UP59kl2V2U8Du1hmg==",
"requires": {
"batch-processor": "1.0.0"
}
},
"iview": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/iview/-/iview-3.3.0.tgz",
"integrity": "sha512-PyqhfxEO9/4rcDNZ1FhMMGjcKh/y/F1p/00/UXrNxXBnih4rIb8GxgxYk0Y14bzuCL/AuMAisOBbQm51o5iOlQ==",
"requires": {
"async-validator": "^1.10.0",
"deepmerge": "^2.2.1",
"element-resize-detector": "^1.2.0",
"js-calendar": "^1.2.3",
"lodash.throttle": "^4.1.1",
"popper.js": "^1.14.6",
"tinycolor2": "^1.4.1",
"v-click-outside-x": "^3.5.6"
}
},
"js-calendar": {
"version": "1.2.3",
"resolved": "https://registry.npmjs.org/js-calendar/-/js-calendar-1.2.3.tgz",
"integrity": "sha512-dAA1/Zbp4+c5E+ARCVTIuKepXsNLzSYfzvOimiYD4S5eeP9QuplSHLcdhfqFSwyM1o1u6ku6RRRCyaZ0YAjiBw=="
},
"lodash.throttle": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/lodash.throttle/-/lodash.throttle-4.1.1.tgz",
"integrity": "sha1-wj6RtxAkKscMN/HhzaknTMOb8vQ="
},
"popper.js": {
"version": "1.14.7",
"resolved": "https://registry.npmjs.org/popper.js/-/popper.js-1.14.7.tgz",
"integrity": "sha512-4q1hNvoUre/8srWsH7hnoSJ5xVmIL4qgz+s4qf2TnJIMyZFUFMGH+9vE7mXynAlHSZ/NdTmmow86muD0myUkVQ=="
},
"regenerator-runtime": {
"version": "0.11.1",
"resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz",
"integrity": "sha512-MguG95oij0fC3QV3URf4V2SDYGJhJnJGqvIIgdECeODCT98wSWDAJ94SSuVpYQUoTcGUIL6L4yNB7j1DFFHSBg=="
},
"tinycolor2": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/tinycolor2/-/tinycolor2-1.4.1.tgz",
"integrity": "sha1-9PrTM0R7wLB9TcjpIJ2POaisd+g="
},
"undefined": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/undefined/-/undefined-0.1.0.tgz",
"integrity": "sha1-m3BqSzKtMMIMpP5l3cu72sMr3tA="
},
"v-click-outside-x": {
"version": "3.7.1",
"resolved": "https://registry.npmjs.org/v-click-outside-x/-/v-click-outside-x-3.7.1.tgz",
"integrity": "sha512-WmUgmcIXr9clVpm1AYS/FgHtcDicfnfoxgQCNg4O6vfk9GVnxA0vSqO321ogUo0b7czYTidj7fQENvWFMWOkUg=="
}
}
}