Remove socket camera streaming (#985)

Removes websocket-based camera streaming functionality. 

Fixes #975. This was caused by destroying the camera streams and recreating them on nickname change. Even when directly using `MJPGFrameConsumer` and the streams were exactly the same, the freeze would occur when creating a new `MjpegServer` and require a refresh. I think this is simply how cscore works?
This commit is contained in:
amquake
2023-10-29 20:03:05 -07:00
committed by GitHub
parent 0898dfe2f7
commit 76e3c6d5a5
8 changed files with 30 additions and 474 deletions

View File

@@ -28,12 +28,18 @@ import org.opencv.core.Point;
import org.opencv.core.Rect;
import org.opencv.imgproc.Imgproc;
import org.photonvision.common.util.ColorHelper;
import org.photonvision.common.util.math.MathUtils;
import org.photonvision.vision.opencv.CVMat;
public class MJPGFrameConsumer {
public static final Mat EMPTY_MAT = new Mat(60, 15 * 7, CvType.CV_8UC3);
public class MJPGFrameConsumer implements AutoCloseable {
private static final double MAX_FRAMERATE = -1;
private static final long MAX_FRAME_PERIOD_NS = Math.round(1e9 / MAX_FRAMERATE);
private long lastFrameTimeNs;
private static final Mat EMPTY_MAT = new Mat(60, 15 * 7, CvType.CV_8UC3);
private static final double EMPTY_FRAMERATE = 2;
private long lastEmptyTime;
private static final long EMPTY_FRAME_PERIOD_NS = Math.round(1e9 / EMPTY_FRAMERATE);
private long lastEmptyTimeNs;
static {
EMPTY_MAT.setTo(ColorHelper.colorToScalar(Color.BLACK));
@@ -168,11 +174,15 @@ public class MJPGFrameConsumer {
public void accept(CVMat image) {
if (image != null && !image.getMat().empty()) {
cvSource.putFrame(image.getMat());
long now = MathUtils.wpiNanoTime();
if (now - lastFrameTimeNs > MAX_FRAME_PERIOD_NS) {
lastFrameTimeNs = now;
cvSource.putFrame(image.getMat());
}
// Make sure our disabled framerate limiting doesn't get confused
isDisabled = false;
lastEmptyTime = 0;
lastEmptyTimeNs = 0;
}
}
@@ -182,9 +192,10 @@ public class MJPGFrameConsumer {
isDisabled = true;
}
if (System.currentTimeMillis() - lastEmptyTime > 1000.0 / EMPTY_FRAMERATE) {
long now = MathUtils.wpiNanoTime();
if (now - lastEmptyTimeNs > EMPTY_FRAME_PERIOD_NS) {
lastEmptyTimeNs = now;
cvSource.putFrame(EMPTY_MAT);
lastEmptyTime = System.currentTimeMillis();
}
}
@@ -233,6 +244,7 @@ public class MJPGFrameConsumer {
}
}
@Override
public void close() {
table.getEntry("connected").setBoolean(false);
mjpegServer.close();

View File

@@ -17,6 +17,7 @@
package org.photonvision.vision.processes;
import edu.wpi.first.cscore.CameraServerJNI;
import edu.wpi.first.cscore.VideoException;
import edu.wpi.first.math.util.Units;
import io.javalin.websocket.WsContext;
@@ -44,6 +45,7 @@ import org.photonvision.vision.camera.QuirkyCamera;
import org.photonvision.vision.camera.USBCameraSource;
import org.photonvision.vision.frame.Frame;
import org.photonvision.vision.frame.consumer.FileSaveFrameConsumer;
import org.photonvision.vision.frame.consumer.MJPGFrameConsumer;
import org.photonvision.vision.pipeline.AdvancedPipelineSettings;
import org.photonvision.vision.pipeline.OutputStreamPipeline;
import org.photonvision.vision.pipeline.ReflectivePipelineSettings;
@@ -51,8 +53,6 @@ import org.photonvision.vision.pipeline.UICalibrationData;
import org.photonvision.vision.pipeline.result.CVPipelineResult;
import org.photonvision.vision.target.TargetModel;
import org.photonvision.vision.target.TrackedTarget;
import org.photonvision.vision.videoStream.SocketVideoStream;
import org.photonvision.vision.videoStream.SocketVideoStreamManager;
/**
* This is the God Class
@@ -83,8 +83,8 @@ public class VisionModule {
FileSaveFrameConsumer inputFrameSaver;
FileSaveFrameConsumer outputFrameSaver;
SocketVideoStream inputVideoStreamer;
SocketVideoStream outputVideoStreamer;
MJPGFrameConsumer inputVideoStreamer;
MJPGFrameConsumer outputVideoStreamer;
public VisionModule(PipelineManager pipelineManager, VisionSource visionSource, int index) {
logger =
@@ -169,11 +169,6 @@ public class VisionModule {
saveAndBroadcastAll();
}
private void destroyStreams() {
SocketVideoStreamManager.getInstance().removeStream(inputVideoStreamer);
SocketVideoStreamManager.getInstance().removeStream(outputVideoStreamer);
}
private void createStreams() {
var camStreamIdx = visionSource.getSettables().getConfiguration().streamIndex;
// If idx = 0, we want (1181, 1182)
@@ -186,10 +181,13 @@ public class VisionModule {
new FileSaveFrameConsumer(
visionSource.getSettables().getConfiguration().nickname, "output");
inputVideoStreamer = new SocketVideoStream(this.inputStreamPort);
outputVideoStreamer = new SocketVideoStream(this.outputStreamPort);
SocketVideoStreamManager.getInstance().addStream(inputVideoStreamer);
SocketVideoStreamManager.getInstance().addStream(outputVideoStreamer);
String camHostname = CameraServerJNI.getHostname();
inputVideoStreamer =
new MJPGFrameConsumer(
camHostname + "_Port_" + inputStreamPort + "_Input_MJPEG_Server", inputStreamPort);
outputVideoStreamer =
new MJPGFrameConsumer(
camHostname + "_Port_" + outputStreamPort + "_Output_MJPEG_Server", outputStreamPort);
}
private void recreateStreamResultConsumers() {
@@ -483,16 +481,6 @@ public class VisionModule {
inputFrameSaver.updateCameraNickname(newName);
outputFrameSaver.updateCameraNickname(newName);
// Rename streams
streamResultConsumers.clear();
// Teardown and recreate streams
destroyStreams();
createStreams();
// Rebuild streamers
recreateStreamResultConsumers();
// Push new data to the UI
saveAndBroadcastAll();
}

View File

@@ -1,117 +0,0 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.vision.videoStream;
import edu.wpi.first.cscore.CameraServerJNI;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.opencv.core.MatOfByte;
import org.opencv.core.MatOfInt;
import org.opencv.imgcodecs.Imgcodecs;
import org.photonvision.common.util.math.MathUtils;
import org.photonvision.vision.frame.consumer.MJPGFrameConsumer;
import org.photonvision.vision.opencv.CVMat;
public class SocketVideoStream implements Consumer<CVMat> {
int portID = 0; // Align with cscore's port for unique identification of stream
MatOfByte jpegBytes = null;
// Gets set to true when another class reads out valid jpeg bytes at least once
// Set back to false when another frame is freshly converted
// Should eliminate synchronization issues of differing rates of putting frames in
// and taking them back out
boolean frameWasConsumed = false;
// Synclock around manipulating the jpeg bytes from multiple threads
Lock jpegBytesLock = new ReentrantLock();
private int userCount = 0;
// FPS-limited MJPEG sender
private final double FPS_MAX = 30.0;
private final long minFramePeriodNanos = Math.round(1000000000.0 / FPS_MAX);
private long nextFrameSendTime = MathUtils.wpiNanoTime() + minFramePeriodNanos;
MJPGFrameConsumer oldSchoolServer;
public SocketVideoStream(int portID) {
this.portID = portID;
oldSchoolServer =
new MJPGFrameConsumer(
CameraServerJNI.getHostname() + "_Port_" + portID + "_MJPEG_Server", portID);
}
@Override
public void accept(CVMat image) {
if (userCount > 0) {
if (jpegBytesLock
.tryLock()) { // we assume frames are coming in frequently. Just skip this frame if we're
// locked doing something else.
try {
// Does a single-shot frame receive and convert to JPEG for efficiency
// Will not capture/convert again until convertNextFrame() is called
if (image != null && !image.getMat().empty() && jpegBytes == null) {
frameWasConsumed = false;
jpegBytes = new MatOfByte();
Imgcodecs.imencode(
".jpg",
image.getMat(),
jpegBytes,
new MatOfInt(Imgcodecs.IMWRITE_JPEG_QUALITY, 75));
}
} finally {
jpegBytesLock.unlock();
}
}
}
// Send the frame in an FPS-limited fashion
var now = MathUtils.wpiNanoTime();
if (now > nextFrameSendTime) {
oldSchoolServer.accept(image);
nextFrameSendTime = now + minFramePeriodNanos;
}
}
public ByteBuffer getJPEGByteBuffer() {
ByteBuffer sendStr = null;
jpegBytesLock.lock();
if (jpegBytes != null) {
sendStr = ByteBuffer.wrap(jpegBytes.toArray());
}
jpegBytesLock.unlock();
return sendStr;
}
public void convertNextFrame() {
jpegBytesLock.lock();
if (jpegBytes != null) {
jpegBytes.release();
jpegBytes = null;
}
jpegBytesLock.unlock();
}
public void addUser() {
userCount++;
}
public void removeUser() {
userCount--;
}
}

View File

@@ -1,101 +0,0 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.vision.videoStream;
import io.javalin.websocket.WsContext;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.Map;
import org.photonvision.common.logging.LogGroup;
import org.photonvision.common.logging.Logger;
public class SocketVideoStreamManager {
private static final int NO_STREAM_PORT = -1;
private final Logger logger = new Logger(SocketVideoStreamManager.class, LogGroup.Camera);
private final Map<Integer, SocketVideoStream> streams = new Hashtable<>();
private final Map<WsContext, Integer> userSubscriptions = new Hashtable<>();
private static class ThreadSafeSingleton {
private static final SocketVideoStreamManager INSTANCE = new SocketVideoStreamManager();
}
public static SocketVideoStreamManager getInstance() {
return ThreadSafeSingleton.INSTANCE;
}
private SocketVideoStreamManager() {}
// Register a new available camera stream
public void addStream(SocketVideoStream newStream) {
streams.put(newStream.portID, newStream);
logger.debug("Added new stream for port " + newStream.portID);
}
// Remove a previously-added camera stream, and unsubscribe all users
public void removeStream(SocketVideoStream oldStream) {
streams.remove(oldStream.portID);
logger.debug("Removed stream for port " + oldStream.portID);
}
// Indicate a user would like to subscribe to a camera stream and get frames from it periodically
public void addSubscription(WsContext user, int streamPortID) {
var stream = streams.get(streamPortID);
if (stream != null) {
userSubscriptions.put(user, streamPortID);
stream.addUser();
} else {
logger.error("User attempted to subscribe to non-existent port " + streamPortID);
}
}
// Indicate a user would like to stop receiving one camera stream
public void removeSubscription(WsContext user) {
var port = userSubscriptions.get(user);
if (port != null && port != NO_STREAM_PORT) {
var stream = streams.get(port);
userSubscriptions.put(user, NO_STREAM_PORT);
if (stream != null) {
stream.removeUser();
}
} else {
logger.error(
"User attempted to unsubscribe, but had not yet previously subscribed successfully.");
}
}
// For a given user, return the jpeg bytes (or null) for the most recent frame
public ByteBuffer getSendFrame(WsContext user) {
var port = userSubscriptions.get(user);
if (port != null && port != NO_STREAM_PORT) {
var stream = streams.get(port);
return stream.getJPEGByteBuffer();
} else {
return null;
}
}
// Causes all streams to "re-trigger" and receive and convert their next mjpeg frame
// Only invoke this after all returned jpeg Strings have been used.
public void allStreamConvertNextFrame() {
for (SocketVideoStream stream : streams.values()) {
stream.convertNextFrame();
}
}
}

View File

@@ -1,143 +0,0 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.websocket.WsBinaryMessageContext;
import io.javalin.websocket.WsCloseContext;
import io.javalin.websocket.WsConnectContext;
import io.javalin.websocket.WsContext;
import io.javalin.websocket.WsMessageContext;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.photonvision.common.logging.LogGroup;
import org.photonvision.common.logging.Logger;
import org.photonvision.vision.videoStream.SocketVideoStreamManager;
public class CameraSocketHandler {
private final Logger logger = new Logger(CameraSocketHandler.class, LogGroup.WebServer);
private final List<WsContext> users = new CopyOnWriteArrayList<>();
private final SocketVideoStreamManager svsManager = SocketVideoStreamManager.getInstance();
private Thread cameraBroadcastThread;
public static class UIMap extends HashMap<String, Object> {}
private static class ThreadSafeSingleton {
private static final CameraSocketHandler INSTANCE = new CameraSocketHandler();
}
public static CameraSocketHandler getInstance() {
return CameraSocketHandler.ThreadSafeSingleton.INSTANCE;
}
private CameraSocketHandler() {
cameraBroadcastThread = new Thread(this::broadcastFramesTask);
cameraBroadcastThread.setPriority(Thread.MAX_PRIORITY - 3); // fairly high priority
cameraBroadcastThread.start();
}
public void onConnect(WsConnectContext context) {
context.session.setIdleTimeout(
Duration.ofMillis(Long.MAX_VALUE)); // TODO: determine better value
var remote = (InetSocketAddress) context.session.getRemoteAddress();
var host = remote.getAddress().toString() + ":" + remote.getPort();
logger.info("New camera websocket connection from " + host);
users.add(context);
}
protected void onClose(WsCloseContext context) {
var remote = (InetSocketAddress) context.session.getRemoteAddress();
var host = remote.getAddress().toString() + ":" + remote.getPort();
var reason = context.reason() != null ? context.reason() : "Connection closed by client";
logger.info("Closing camera websocket connection from " + host + " for reason: " + reason);
svsManager.removeSubscription(context);
users.remove(context);
}
@SuppressWarnings({"unchecked"})
public void onMessage(WsMessageContext context) {
var messageStr = context.message();
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode actualObj = mapper.readTree(messageStr);
try {
var entryCmd = actualObj.get("cmd").asText();
var socketMessageType = CameraSocketMessageType.fromEntryKey(entryCmd);
logger.trace(() -> "Got Camera WS message: [" + socketMessageType + "]");
if (socketMessageType == null) {
logger.warn("Got unknown socket message command: " + entryCmd);
}
switch (socketMessageType) {
case CSMT_SUBSCRIBE:
{
int portId = actualObj.get("port").asInt();
svsManager.addSubscription(context, portId);
break;
}
case CSMT_UNSUBSCRIBE:
{
svsManager.removeSubscription(context);
break;
}
}
} catch (Exception e) {
logger.error("Failed to parse message!", e);
}
} catch (JsonProcessingException e) {
logger.warn("Could not parse message \"" + messageStr + "\"");
e.printStackTrace();
return;
}
}
@SuppressWarnings({"unchecked"})
public void onBinaryMessage(WsBinaryMessageContext context) {
return; // ignoring binary messages for now
}
private void broadcastFramesTask() {
// Background camera image broadcasting thread
while (!Thread.currentThread().isInterrupted()) {
svsManager.allStreamConvertNextFrame();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
logger.error("Exception waiting for camera stream broadcast semaphore", e);
}
for (var user : users) {
var sendBytes = svsManager.getSendFrame(user);
if (sendBytes != null) {
user.send(sendBytes);
}
}
}
}
}

View File

@@ -1,46 +0,0 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.server;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("unused")
public enum CameraSocketMessageType {
CSMT_SUBSCRIBE("subscribe"),
CSMT_UNSUBSCRIBE("unsubscribe");
public final String entryKey;
CameraSocketMessageType(String entryKey) {
this.entryKey = entryKey;
}
private static final Map<String, CameraSocketMessageType> entryKeyToValueMap = new HashMap<>();
static {
for (var value : EnumSet.allOf(CameraSocketMessageType.class)) {
entryKeyToValueMap.put(value.entryKey, value);
}
}
public static CameraSocketMessageType fromEntryKey(String entryKey) {
return entryKeyToValueMap.get(entryKey);
}
}

View File

@@ -79,17 +79,6 @@ public class Server {
ws.onBinaryMessage(dsHandler::onBinaryMessage);
});
/*Web Socket Events for Camera Streaming */
var camDsHandler = CameraSocketHandler.getInstance();
app.ws(
"/websocket_cameras",
ws -> {
ws.onConnect(camDsHandler::onConnect);
ws.onClose(camDsHandler::onClose);
ws.onBinaryMessage(camDsHandler::onBinaryMessage);
ws.onMessage(camDsHandler::onMessage);
});
/*API Events*/
// Settings
app.post("/api/settings", RequestHandler::onSettingsImportRequest);

View File

@@ -1,26 +0,0 @@
/*
* Copyright (C) Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.server;
import org.photonvision.common.configuration.CameraConfiguration;
import org.photonvision.common.configuration.NetworkConfig;
public class UISettings {
public NetworkConfig networkConfig;
public CameraConfiguration currentCameraConfiguration;
}