Add Websocket Camera Streaming (#529)

* WIP adding second websocket handling for cameras

* just more WIP

* even more wip. Most java-side framework completed, but not yet debugged

* IT LIVES. Still needs lots of cleanup. But we're transferring and displaying data!

* moved down an architecture layer. Improved multiple-camera handling

* Additional WIP to help improve smoothness and performance, though not yet tested

* bugfixes galore

* tweak compression

* spotless

* more tweaks for handling slow/intermittent streams

* wpilibformat maybe?

* clang-format maybe?

* WIP - adding thinclient. I don't like it yet, it should be more auto-generated than it is.

* thinclient formatting fixups

* Reduced amount of empty send data by limiting to only one stream per client (which is all we really need). Framerate is up slightly, overhead is down.

* bugfixes, faster streaming, better mjpeg compression settings, thinclient working

* spotless and formatting

* cmon wpiformat....

* re-added mjpg streams

* added a loading GIF to imporve the feeling of responsiveness

* formatting

* urlparams and built-in thinclient

* wpiformat

* prevent wpiformat complaints

* Removed uint8 array and base64 conversion from client side

* Synced up js implementations for ws streaming

* formatting/spotless
This commit is contained in:
Chris Gerth
2022-10-30 13:16:17 -05:00
committed by GitHub
parent b68b0ca5f6
commit aaac6a4fbb
23 changed files with 890 additions and 112 deletions

View File

@@ -438,7 +438,7 @@ public class ConfigManager {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Exception waiting for settings semaphor", e);
logger.error("Exception waiting for settings semaphore", e);
}
}
}

View File

@@ -40,7 +40,6 @@ import org.photonvision.vision.camera.USBCameraSource;
import org.photonvision.vision.camera.ZeroCopyPicamSource;
import org.photonvision.vision.frame.Frame;
import org.photonvision.vision.frame.consumer.FileSaveFrameConsumer;
import org.photonvision.vision.frame.consumer.MJPGFrameConsumer;
import org.photonvision.vision.pipeline.AdvancedPipelineSettings;
import org.photonvision.vision.pipeline.OutputStreamPipeline;
import org.photonvision.vision.pipeline.ReflectivePipelineSettings;
@@ -48,6 +47,8 @@ import org.photonvision.vision.pipeline.UICalibrationData;
import org.photonvision.vision.pipeline.result.CVPipelineResult;
import org.photonvision.vision.target.TargetModel;
import org.photonvision.vision.target.TrackedTarget;
import org.photonvision.vision.videoStream.SocketVideoStream;
import org.photonvision.vision.videoStream.SocketVideoStreamManager;
/**
* This is the God Class
@@ -56,32 +57,31 @@ import org.photonvision.vision.target.TrackedTarget;
* provide info on settings changes. VisionModuleManager holds a list of all current vision modules.
*/
public class VisionModule {
private static final int streamFPSCap = 30;
private final Logger logger;
protected final PipelineManager pipelineManager;
protected final VisionSource visionSource;
private final VisionRunner visionRunner;
private final StreamRunnable streamRunnable;
private final LinkedList<CVPipelineResultConsumer> resultConsumers = new LinkedList<>();
private final LinkedList<CVPipelineResultConsumer> fpsLimitedResultConsumers = new LinkedList<>();
// Raw result consumers run before any drawing has been done by the OutputStreamPipeline
private final LinkedList<TriConsumer<Frame, Frame, List<TrackedTarget>>> rawResultConsumers =
private final LinkedList<TriConsumer<Frame, Frame, List<TrackedTarget>>> streamResultConsumers =
new LinkedList<>();
private final NTDataPublisher ntConsumer;
private final UIDataPublisher uiDataConsumer;
protected final int moduleIndex;
protected final QuirkyCamera cameraQuirks;
private long lastFrameConsumeMillis;
protected TrackedTarget lastPipelineResultBestTarget;
MJPGFrameConsumer dashboardInputStreamer;
MJPGFrameConsumer dashboardOutputStreamer;
private int inputStreamPort = -1;
private int outputStreamPort = -1;
FileSaveFrameConsumer inputFrameSaver;
FileSaveFrameConsumer outputFrameSaver;
SocketVideoStream inputVideoStreamer;
SocketVideoStream outputVideoStreamer;
public VisionModule(PipelineManager pipelineManager, VisionSource visionSource, int index) {
logger =
new Logger(
@@ -129,7 +129,7 @@ public class VisionModule {
createStreams();
recreateFpsLimitedResultConsumers();
recreateStreamResultConsumers();
ntConsumer =
new NTDataPublisher(
@@ -166,49 +166,33 @@ public class VisionModule {
}
private void destroyStreams() {
dashboardInputStreamer.close();
dashboardOutputStreamer.close();
SocketVideoStreamManager.getInstance().removeStream(inputVideoStreamer);
SocketVideoStreamManager.getInstance().removeStream(outputVideoStreamer);
}
private void createStreams() {
var camStreamIdx = visionSource.getSettables().getConfiguration().streamIndex;
// If idx = 0, we want (1181, 1182)
var inputStreamPort = 1181 + (camStreamIdx * 2);
var outputStreamPort = 1181 + (camStreamIdx * 2) + 1;
dashboardOutputStreamer =
new MJPGFrameConsumer(
visionSource.getSettables().getConfiguration().nickname + "-output", outputStreamPort);
dashboardInputStreamer =
new MJPGFrameConsumer(
visionSource.getSettables().getConfiguration().uniqueName + "-input", inputStreamPort);
this.inputStreamPort = 1181 + (camStreamIdx * 2);
this.outputStreamPort = 1181 + (camStreamIdx * 2) + 1;
inputFrameSaver =
new FileSaveFrameConsumer(visionSource.getSettables().getConfiguration().nickname, "input");
outputFrameSaver =
new FileSaveFrameConsumer(
visionSource.getSettables().getConfiguration().nickname, "output");
inputVideoStreamer = new SocketVideoStream(this.inputStreamPort);
outputVideoStreamer = new SocketVideoStream(this.outputStreamPort);
SocketVideoStreamManager.getInstance().addStream(inputVideoStreamer);
SocketVideoStreamManager.getInstance().addStream(outputVideoStreamer);
}
private void recreateFpsLimitedResultConsumers() {
// Important! These must come before the stream result consumers because the stream result
// consumers release the frame
rawResultConsumers.add((in, out, tgts) -> inputFrameSaver.accept(in));
fpsLimitedResultConsumers.add(result -> outputFrameSaver.accept(result.outputFrame));
fpsLimitedResultConsumers.add(
result -> {
if (this.pipelineManager.getCurrentPipelineSettings().inputShouldShow)
dashboardInputStreamer.accept(result.inputFrame);
else dashboardInputStreamer.disabledTick();
});
fpsLimitedResultConsumers.add(
result -> {
if (this.pipelineManager.getCurrentPipelineSettings().outputShouldShow)
dashboardOutputStreamer.accept(result.outputFrame);
else dashboardInputStreamer.disabledTick();
;
});
private void recreateStreamResultConsumers() {
streamResultConsumers.add((in, out, tgts) -> inputFrameSaver.accept(in));
streamResultConsumers.add((in, out, tgts) -> outputFrameSaver.accept(out));
streamResultConsumers.add((in, out, tgts) -> inputVideoStreamer.accept(in));
streamResultConsumers.add((in, out, tgts) -> outputVideoStreamer.accept(out));
}
private class StreamRunnable extends Thread {
@@ -270,12 +254,11 @@ public class VisionModule {
this.shouldRun = false;
}
if (shouldRun) {
consumeRawResults(inputFrame, outputFrame, targets);
try {
CVPipelineResult osr =
outputStreamPipeline.process(inputFrame, outputFrame, settings, targets);
consumeResults(inputFrame, osr.outputFrame, targets);
consumeFpsLimitedResult(osr);
} catch (Exception e) {
// Never die
logger.error("Exception while running stream runnable!", e);
@@ -473,14 +456,14 @@ public class VisionModule {
outputFrameSaver.updateCameraNickname(newName);
// Rename streams
fpsLimitedResultConsumers.clear();
streamResultConsumers.clear();
// Teardown and recreate streams
destroyStreams();
createStreams();
// Rebuild streamers
recreateFpsLimitedResultConsumers();
recreateStreamResultConsumers();
// Push new data to the UI
saveAndBroadcastAll();
@@ -515,8 +498,8 @@ public class VisionModule {
temp.put(k, internalMap);
}
ret.videoFormatList = temp;
ret.outputStreamPort = dashboardOutputStreamer.getCurrentStreamPort();
ret.inputStreamPort = dashboardInputStreamer.getCurrentStreamPort();
ret.outputStreamPort = this.outputStreamPort;
ret.inputStreamPort = this.inputStreamPort;
var calList = new ArrayList<HashMap<String, Object>>();
for (var c : visionSource.getSettables().getConfiguration().calibrations) {
@@ -566,7 +549,7 @@ public class VisionModule {
result.targets);
// The streamRunnable manages releasing in this case
} else {
consumeFpsLimitedResult(result);
consumeResults(result.inputFrame, result.outputFrame, result.targets);
result.release();
// In this case we don't bother with a separate streaming thread and we release
@@ -579,19 +562,9 @@ public class VisionModule {
}
}
private void consumeFpsLimitedResult(CVPipelineResult result) {
long dt = System.currentTimeMillis() - lastFrameConsumeMillis;
if (dt > 1000 / streamFPSCap) {
for (var c : fpsLimitedResultConsumers) {
c.accept(result);
}
lastFrameConsumeMillis = System.currentTimeMillis();
}
}
/** Consume results prior to drawing on them. */
private void consumeRawResults(Frame inputFrame, Frame outputFrame, List<TrackedTarget> targets) {
for (var c : rawResultConsumers) {
/** Consume stream/target results, no rate limiting applied */
private void consumeResults(Frame inputFrame, Frame outputFrame, List<TrackedTarget> targets) {
for (var c : streamResultConsumers) {
c.accept(inputFrame, outputFrame, targets);
}
}

View File

@@ -0,0 +1,116 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.vision.videoStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.opencv.core.MatOfByte;
import org.opencv.core.MatOfInt;
import org.opencv.imgcodecs.Imgcodecs;
import org.photonvision.vision.frame.Frame;
import org.photonvision.vision.frame.consumer.MJPGFrameConsumer;
public class SocketVideoStream implements Consumer<Frame> {
int portID = 0; // Align with cscore's port for unique identification of stream
MatOfByte jpegBytes = null;
// Gets set to true when another class reads out valid jpeg bytes at least once
// Set back to false when another frame is freshly converted
// Should eliminate synchronization issues of differeing rates of putting frames in
// and taking them back out
boolean frameWasConsumed = false;
// Synclock around manipulating the jpeg bytes from multiple threads
Lock jpegBytesLock = new ReentrantLock();
MJPGFrameConsumer oldSchoolServer;
private int userCount = 0;
public SocketVideoStream(int portID) {
this.portID = portID;
oldSchoolServer =
new MJPGFrameConsumer("Port_" + Integer.toString(portID) + "_MJPEG_Server", portID);
}
@Override
public void accept(Frame frame) {
if (userCount > 0) {
if (jpegBytesLock
.tryLock()) { // we assume frames are coming in frequently. Just skip this frame if we're
// locked doing something else.
try {
// Does a single-shot frame recieve and convert to JPEG for efficency
// Will not capture/convert again until convertNextFrame() is called
if (frame != null && !frame.image.getMat().empty() && jpegBytes == null) {
frameWasConsumed = false;
jpegBytes = new MatOfByte();
Imgcodecs.imencode(
".jpg",
frame.image.getMat(),
jpegBytes,
new MatOfInt(Imgcodecs.IMWRITE_JPEG_QUALITY, 75));
}
} finally {
jpegBytesLock.unlock();
}
}
}
oldSchoolServer.accept(frame);
}
public String getJPEGBase64EncodedStr() {
String sendStr = null;
jpegBytesLock.lock();
if (jpegBytes != null) {
sendStr = Base64.getEncoder().encodeToString(jpegBytes.toArray());
}
jpegBytesLock.unlock();
return sendStr;
}
public ByteBuffer getJPEGByteBuffer() {
ByteBuffer sendStr = null;
jpegBytesLock.lock();
if (jpegBytes != null) {
sendStr = ByteBuffer.wrap(jpegBytes.toArray());
}
jpegBytesLock.unlock();
return sendStr;
}
public void convertNextFrame() {
jpegBytesLock.lock();
if (jpegBytes != null) {
jpegBytes.release();
jpegBytes = null;
}
jpegBytesLock.unlock();
}
public void addUser() {
userCount++;
}
public void removeUser() {
userCount--;
}
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.vision.videoStream;
import io.javalin.websocket.WsContext;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.Map;
import org.photonvision.common.logging.LogGroup;
import org.photonvision.common.logging.Logger;
public class SocketVideoStreamManager {
private static final int NO_STREAM_PORT = -1;
private final Logger logger = new Logger(SocketVideoStreamManager.class, LogGroup.Camera);
private Map<Integer, SocketVideoStream> streams = new Hashtable<Integer, SocketVideoStream>();
private Map<WsContext, Integer> userSubscriptions = new Hashtable<WsContext, Integer>();
private static class ThreadSafeSingleton {
private static final SocketVideoStreamManager INSTANCE = new SocketVideoStreamManager();
}
public static SocketVideoStreamManager getInstance() {
return ThreadSafeSingleton.INSTANCE;
}
private SocketVideoStreamManager() {}
// Register a new available camera stream
public void addStream(SocketVideoStream newStream) {
streams.put(newStream.portID, newStream);
logger.debug("Added new stream for port " + Integer.toString(newStream.portID));
}
// Remove a previously-added camera stream, and unsubscribe all users
public void removeStream(SocketVideoStream oldStream) {
streams.remove(oldStream.portID);
logger.debug("Removed stream for port " + Integer.toString(oldStream.portID));
}
// Indicate a user would like to subscribe to a camera stream and get frames from it periodically
public void addSubscription(WsContext user, int streamPortID) {
var stream = streams.get(streamPortID);
if (stream != null) {
userSubscriptions.put(user, streamPortID);
stream.addUser();
} else {
logger.error(
"User attempted to subscribe to non-existent port " + Integer.toString(streamPortID));
}
}
// Indicate a user would like to stop receiving one camera stream
public void removeSubscription(WsContext user) {
var port = userSubscriptions.get(user);
if (port != null) {
var stream = streams.get(port);
userSubscriptions.put(user, NO_STREAM_PORT);
stream.removeUser();
}
}
// For a given user, return the jpeg bytes (or null) for the most recent frame
public ByteBuffer getSendFrame(WsContext user) {
var port = userSubscriptions.get(user);
if (port != null && port != NO_STREAM_PORT) {
var stream = streams.get(port);
return stream.getJPEGByteBuffer();
} else {
return null;
}
}
// Causes all streams to "re-trigger" and recieve and convert their next mjpeg frame
// Only invoke this after all returned jpeg Strings have been used.
public void allStreamConvertNextFrame() {
for (SocketVideoStream stream : streams.values()) {
stream.convertNextFrame();
}
}
}