mirror of
https://github.com/PhotonVision/photonvision
synced 2026-06-25 01:41:40 +00:00
working send and revice using zmq at high speeds
This commit is contained in:
232
backend/app/classes/imagezmq.py
Normal file
232
backend/app/classes/imagezmq.py
Normal file
@@ -0,0 +1,232 @@
|
||||
""" imagezmq: Transport OpenCV images via ZMQ.
|
||||
|
||||
Classes that transport OpenCV images from one computer to another. For example,
|
||||
OpenCV images gathered by a Raspberry Pi camera could be sent to another
|
||||
computer for displaying the images using cv2.imshow() or for further image
|
||||
processing. See API and Usage Examples for details.
|
||||
|
||||
Copyright (c) 2017 by Jeff Bass.
|
||||
License: MIT, see LICENSE for more details.
|
||||
"""
|
||||
|
||||
import zmq
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
|
||||
class ImageSender():
|
||||
"""Opens zmq REQ socket and sends images.
|
||||
|
||||
Opens a zmq REQ socket on the image sending computer, often a
|
||||
Raspberry Pi, that will be sending OpenCV images and
|
||||
related text messages to the hub computer. Provides methods to
|
||||
send images or send jpg compressed images.
|
||||
|
||||
Arguments:
|
||||
connect_to: the tcp address:port of the hub computer.
|
||||
"""
|
||||
|
||||
def __init__(self, connect_to='tcp://127.0.0.1:5555'):
|
||||
"""Initializes zmq socket for sending images to the hub.
|
||||
|
||||
Expects an open socket at the connect_to tcp address; it will
|
||||
connect to that remote socket after setting up the REQ
|
||||
socket on this computer.
|
||||
"""
|
||||
|
||||
self.zmq_context = SerializingContext()
|
||||
self.zmq_socket = self.zmq_context.socket(zmq.REQ)
|
||||
self.zmq_socket.connect(connect_to)
|
||||
|
||||
def send_image(self, msg, image):
|
||||
"""Sends OpenCV image and msg to hub computer.
|
||||
|
||||
Arguments:
|
||||
msg: text message or image name.
|
||||
image: OpenCV image to send to hub.
|
||||
|
||||
Returns:
|
||||
A text reply from hub.
|
||||
"""
|
||||
|
||||
if image.flags['C_CONTIGUOUS']:
|
||||
# if image is already contiguous in memory just send it
|
||||
self.zmq_socket.send_array(image, msg, copy=False)
|
||||
else:
|
||||
# else make it contiguous before sending
|
||||
image = np.ascontiguousarray(image)
|
||||
self.zmq_socket.send_array(image, msg, copy=False)
|
||||
hub_reply = self.zmq_socket.recv() # receive the reply message
|
||||
return hub_reply
|
||||
|
||||
def send_jpg(self, msg, jpg_buffer):
|
||||
"""Sends msg text and jpg buffer to hub computer.
|
||||
|
||||
Arguments:
|
||||
msg: image name or message text.
|
||||
jpg_buffer: bytestring containing the jpg image to send to hub.
|
||||
Returns:
|
||||
A text reply from hub.
|
||||
"""
|
||||
|
||||
self.zmq_socket.send_jpg(msg, jpg_buffer, copy=False)
|
||||
hub_reply = self.zmq_socket.recv() # receive the reply message
|
||||
return hub_reply
|
||||
|
||||
|
||||
class ImageHub():
|
||||
"""Opens zmq REP socket and receives images.
|
||||
|
||||
Opens a zmq REP socket on the hub compuer, for example,
|
||||
a Mac, that will be receiving and displaying or processing OpenCV images
|
||||
and related text messages. Provides methods to receive images or receive
|
||||
jpg compressed images.
|
||||
|
||||
Arguments:
|
||||
open_port: (optional) the socket to open for receiving REQ requests.
|
||||
"""
|
||||
|
||||
def __init__(self, open_port='tcp://*:5555'):
|
||||
"""Initializes zmq REP socket to receive images and text.
|
||||
"""
|
||||
|
||||
self.zmq_context = SerializingContext()
|
||||
self.zmq_socket = self.zmq_context.socket(zmq.REP)
|
||||
self.zmq_socket.bind(open_port)
|
||||
|
||||
def recv_image(self, copy=False):
|
||||
"""Receives OpenCV image and text msg.
|
||||
|
||||
Arguments:
|
||||
copy: (optional) zmq copy flag.
|
||||
|
||||
Returns:
|
||||
msg: text msg, often the image name.
|
||||
image: OpenCV image.
|
||||
"""
|
||||
|
||||
msg, image = self.zmq_socket.recv_array(copy=False)
|
||||
return msg, image
|
||||
|
||||
def recv_jpg(self, copy=False):
|
||||
"""Receives text msg, jpg buffer.
|
||||
|
||||
Arguments:
|
||||
copy: (optional) zmq copy flag
|
||||
Returns:
|
||||
msg: text message, often image name
|
||||
jpg_buffer: bytestring jpg compressed image
|
||||
"""
|
||||
|
||||
msg, jpg_buffer = self.zmq_socket.recv_jpg(copy=False)
|
||||
return msg, jpg_buffer
|
||||
|
||||
def send_reply(self, reply_message=b'OK'):
|
||||
"""Sends the zmq REP reply message.
|
||||
|
||||
Arguments:
|
||||
reply_message: reply message text, often just string 'OK'
|
||||
"""
|
||||
self.zmq_socket.send(reply_message)
|
||||
|
||||
|
||||
class SerializingSocket(zmq.Socket):
|
||||
"""Numpy array serialization methods.
|
||||
|
||||
Modelled on PyZMQ serialization examples.
|
||||
|
||||
Used for sending / receiving OpenCV images, which are Numpy arrays.
|
||||
Also used for sending / receiving jpg compressed OpenCV images.
|
||||
"""
|
||||
|
||||
def send_array(self, A, msg='NoName', flags=0, copy=True, track=False):
|
||||
"""Sends a numpy array with metadata and text message.
|
||||
|
||||
Sends a numpy array with the metadata necessary for reconstructing
|
||||
the array (dtype,shape). Also sends a text msg, often the array or
|
||||
image name.
|
||||
|
||||
Arguments:
|
||||
A: numpy array or OpenCV image.
|
||||
msg: (optional) array name, image name or text message.
|
||||
flags: (optional) zmq flags.
|
||||
copy: (optional) zmq copy flag.
|
||||
track: (optional) zmq track flag.
|
||||
"""
|
||||
|
||||
md = dict(
|
||||
msg=msg,
|
||||
dtype=str(A.dtype),
|
||||
shape=A.shape,
|
||||
)
|
||||
self.send_json(md, flags | zmq.SNDMORE)
|
||||
return self.send(A, flags, copy=copy, track=track)
|
||||
|
||||
def send_jpg(self,
|
||||
msg='NoName',
|
||||
jpg_buffer=b'00',
|
||||
flags=0,
|
||||
copy=True,
|
||||
track=False):
|
||||
"""Send a jpg buffer with a text message.
|
||||
|
||||
Sends a jpg bytestring of an OpenCV image.
|
||||
Also sends text msg, often the image name.
|
||||
|
||||
Arguments:
|
||||
msg: image name or text message.
|
||||
jpg_buffer: jpg buffer of compressed image to be sent.
|
||||
flags: (optional) zmq flags.
|
||||
copy: (optional) zmq copy flag.
|
||||
track: (optional) zmq track flag.
|
||||
"""
|
||||
|
||||
md = dict(msg=msg, )
|
||||
self.send_json(md, flags | zmq.SNDMORE)
|
||||
return self.send(jpg_buffer, flags, copy=copy, track=track)
|
||||
|
||||
def recv_array(self, flags=0, copy=True, track=False):
|
||||
"""Receives a numpy array with metadata and text message.
|
||||
|
||||
Receives a numpy array with the metadata necessary
|
||||
for reconstructing the array (dtype,shape).
|
||||
Returns the array and a text msg, often the array or image name.
|
||||
|
||||
Arguments:
|
||||
flags: (optional) zmq flags.
|
||||
copy: (optional) zmq copy flag.
|
||||
track: (optional) zmq track flag.
|
||||
|
||||
Returns:
|
||||
msg: image name or text message.
|
||||
A: numpy array or OpenCV image reconstructed with dtype and shape.
|
||||
"""
|
||||
|
||||
md = self.recv_json(flags=flags)
|
||||
msg = self.recv(flags=flags, copy=copy, track=track)
|
||||
A = np.frombuffer(msg, dtype=md['dtype'])
|
||||
return (md['msg'], A.reshape(md['shape']))
|
||||
|
||||
def recv_jpg(self, flags=0, copy=True, track=False):
|
||||
"""Receives a jpg buffer and a text msg.
|
||||
|
||||
Receives a jpg bytestring of an OpenCV image.
|
||||
Also receives a text msg, often the image name.
|
||||
|
||||
Arguments:
|
||||
flags: (optional) zmq flags.
|
||||
copy: (optional) zmq copy flag.
|
||||
track: (optional) zmq track flag.
|
||||
|
||||
Returns:
|
||||
msg: image name or text message.
|
||||
jpg_buffer: bytestring, containing jpg image.
|
||||
"""
|
||||
|
||||
md = self.recv_json(flags=flags) # metadata text
|
||||
jpg_buffer = self.recv(flags=flags, copy=copy, track=track)
|
||||
return (md['msg'], jpg_buffer)
|
||||
|
||||
|
||||
class SerializingContext(zmq.Context):
|
||||
_socket_class = SerializingSocket
|
||||
@@ -9,6 +9,8 @@ import time
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import zmq
|
||||
import base64
|
||||
|
||||
|
||||
|
||||
class VisionHandler(metaclass=Singleton):
|
||||
@@ -78,10 +80,10 @@ class VisionHandler(metaclass=Singleton):
|
||||
port = 5550
|
||||
|
||||
for cam_name in SettingsManager().usb_cameras:
|
||||
threading.Thread(target=self.thread_proc, args=(cs, cam_name, str(port))).start()
|
||||
threading.Thread(target=self.thread_proc, args=(cs, cam_name, port)).start()
|
||||
port += 1
|
||||
|
||||
def thread_proc(self, cs, cam_name, port="5557"):
|
||||
def thread_proc(self, cs, cam_name, port=5557):
|
||||
cv_sink = cs.getVideo(camera=SettingsManager.usb_cameras[cam_name])
|
||||
|
||||
width = SettingsManager().cams[cam_name]["video_mode"]["width"]
|
||||
@@ -92,30 +94,26 @@ class VisionHandler(metaclass=Singleton):
|
||||
cv_publish = cs.putVideo(name=cam_name, width=width, height=height)
|
||||
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
socket.bind("tcp://*:%s" % port)
|
||||
socket = context.socket(zmq.PAIR)
|
||||
socket.bind('tcp://*:%s' % str(port))
|
||||
|
||||
p = Process(target=self.camera_process, args=(cam_name, port))
|
||||
p.start()
|
||||
|
||||
pipeline = SettingsManager().cams[cam_name]["pipelines"]["pipeline0"]
|
||||
while True:
|
||||
# start = time.time()
|
||||
if(pipeline != SettingsManager().cams[cam_name]["pipelines"]["pipeline0"]):
|
||||
|
||||
pipeline = SettingsManager().cams[cam_name]["pipelines"]["pipeline0"]
|
||||
|
||||
# start = time.time(
|
||||
_, image = cv_sink.grabFrame(image)
|
||||
socket.send_pyobj({'image': image,
|
||||
'pipeline': pipeline})
|
||||
# end = time.time()
|
||||
image = socket.recv_pyobj()
|
||||
cv_publish.putFrame(image)
|
||||
|
||||
socket.send_json(dict(
|
||||
pipeline=pipeline
|
||||
))
|
||||
socket.send_pyobj(image)
|
||||
p_image = socket.recv_pyobj()
|
||||
cv_publish.putFrame(p_image)
|
||||
# print(cam_name + " " + str(1 / (end - start)))
|
||||
|
||||
def camera_process(self, cam_name, port):
|
||||
|
||||
|
||||
|
||||
# def change_camera_values():
|
||||
# camera.setBrightness(0)
|
||||
# camera.setExposureManual(0)
|
||||
@@ -141,12 +139,12 @@ class VisionHandler(metaclass=Singleton):
|
||||
cam_area = width * height
|
||||
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REP)
|
||||
socket.connect("tcp://localhost:%s" % port)
|
||||
socket = context.socket(zmq.PAIR)
|
||||
socket.connect('tcp://localhost:%s' % str(port))
|
||||
|
||||
while True:
|
||||
obj = socket.recv_pyobj()
|
||||
image = obj['image']
|
||||
obj = socket.recv_json()
|
||||
image = socket.recv_pyobj()
|
||||
curr_pipeline = obj["pipeline"]
|
||||
hsv_image = self._hsv_threshold(curr_pipeline["hue"],
|
||||
curr_pipeline["saturation"], curr_pipeline["value"],
|
||||
@@ -156,4 +154,7 @@ class VisionHandler(metaclass=Singleton):
|
||||
filtered_contours = self.filter_contours(contours, cam_area, curr_pipeline["area"], curr_pipeline["ratio"],
|
||||
curr_pipeline["extent"])
|
||||
res = self.draw_image(input_image=image, is_binary=False, rectangles=filtered_contours)
|
||||
# cv2.putText(res, str(fps), (10, 200), font, 4, (0, 0, 0), 2, cv2.LINE_AA)
|
||||
socket.send_pyobj(res)
|
||||
|
||||
|
||||
|
||||
@@ -1 +1 @@
|
||||
{"pipelines": {"pipeline0": {"exposure": 25, "brightness": 19, "orientation": "Normal", "resolution": 11, "hue": [0, 10], "saturation": [58, 69], "value": [61, 87], "erode": false, "dilate": false, "area": [0, 100], "ratio": [0, 20], "extent": [0, 100], "is_binary": "Normal"}}, "path": "/dev/v4l/by-path/pci-0000:02:03.0-usb-0:1:1.0-video-index0", "video_mode": {"fps": 50, "width": 640, "height": 480, "pixel_format": "kYUYV"}}
|
||||
{"pipelines": {"pipeline0": {"exposure": 25, "brightness": 19, "orientation": "Normal", "resolution": 1, "hue": [0, 10], "saturation": [58, 69], "value": [61, 87], "erode": false, "dilate": false, "area": [0, 100], "ratio": [0, 20], "extent": [0, 100], "is_binary": "Normal"}}, "path": "/dev/v4l/by-path/pci-0000:02:03.0-usb-0:1:1.0-video-index0", "video_mode": {"fps": 150, "width": 320, "height": 240, "pixel_format": "kYUYV"}}
|
||||
Reference in New Issue
Block a user