From a64697e714e38d2c066d91a1eda5f4d8fe5b2f34 Mon Sep 17 00:00:00 2001 From: Chris Gerth Date: Thu, 3 Nov 2022 15:05:17 -0500 Subject: [PATCH] Added proper state machine to websocket video stream to control connect/disconnect sequence better. (#561) --- .../src/components/common/cv-image.vue | 7 +- .../src/plugins/WebsocketVideoStream.js | 192 +++++++++++++----- .../videoStream/SocketVideoStreamManager.java | 5 +- photon-thinclient/thinclient.html | 186 ++++++++++++----- 4 files changed, 290 insertions(+), 100 deletions(-) diff --git a/photon-client/src/components/common/cv-image.vue b/photon-client/src/components/common/cv-image.vue index 240e5a89c..72e9fb1bf 100644 --- a/photon-client/src/components/common/cv-image.vue +++ b/photon-client/src/components/common/cv-image.vue @@ -65,9 +65,9 @@ disconnected(newVal, oldVal){ oldVal; if(newVal){ - this.wsStream.stopStream(); + this.wsStream.setPort(0); } else { - this.wsStream.startStream(); + this.wsStream.setPort(this.port); } } }, @@ -76,8 +76,7 @@ this.wsStream = new wsvs.WebsocketVideoStream(this.id, this.port, window.location.host); }, unmounted() { - this.wsStream.stopStream(); - this.wsStream.ws_close(); + this.wsStream.setPort(0); }, methods: { reload() { diff --git a/photon-client/src/plugins/WebsocketVideoStream.js b/photon-client/src/plugins/WebsocketVideoStream.js index 7a409e8dd..351383405 100644 --- a/photon-client/src/plugins/WebsocketVideoStream.js +++ b/photon-client/src/plugins/WebsocketVideoStream.js @@ -8,91 +8,184 @@ export class WebsocketVideoStream{ this.drawDiv = drawDiv; this.image = document.getElementById(this.drawDiv); this.streamPort = streamPort; + this.newStreamPortReq = null; this.serverAddr = "ws://" + host + "/websocket_cameras"; - this.noStream = false; - this.noStreamPrev = false; - this.setNoStream(); + this.dispNoStream(); this.ws_connect(); this.imgData = null; this.imgDataTime = -1; this.imgObjURL = null; this.frameRxCount = 0; + //Display state machine + this.DSM_DISCONNECTED = "DISCONNECTED"; + this.DSM_WAIT_FOR_VALID_PORT = "WAIT_FOR_VALID_PORT"; + this.DSM_SUBSCRIBE = "SUBSCRIBE"; + this.DSM_WAIT_FOR_FIRST_FRAME = "WAIT_FOR_FIRST_FRAME"; + this.DSM_SHOWING = "SHOWING"; + this.DSM_RESTART_UNSUBSCRIBE = "UNSUBSCRIBE"; + this.DSM_RESTART_WAIT = "WAIT_BEFORE_SUBSCRIBE"; + + this.dsm_cur_state = this.DSM_DISCONNECTED; + this.dsm_prev_state = this.DSM_DISCONNECTED; + this.dsm_restart_start_time = window.performance.now(); + requestAnimationFrame(()=>this.animationLoop()); } + dispImageData(){ + //From https://stackoverflow.com/questions/67507616/set-image-src-from-image-blob/67507685#67507685 + if(this.imgObjURL != null){ + URL.revokeObjectURL(this.imgObjURL) + } + this.imgObjURL = URL.createObjectURL(this.imgData); + + //Update the image with the new mimetype and image + this.image.src = this.imgObjURL; + } + + dispNoStream() { + this.image.src = require("../assets/loading.gif"); + } + animationLoop(){ + // Update time metrics var now = window.performance.now(); + var timeInState = now - this.dsm_restart_start_time; - if((now - this.imgDataTime) > 2500 && this.imgData != null){ - //Handle websocket send timeouts by restarting - this.setNoStream(); - this.stopStream(); - setTimeout(this.startStream.bind(this), 1000); //restart stream one second later + // Save previous state + this.dsm_prev_state = this.dsm_cur_state; + + // Evaluate state transitions + if(this.serverConnectionActive == false){ + //Any state - if the server connection goes false, always transition to disconnected + this.dsm_cur_state = this.DSM_DISCONNECTED; } else { - if(this.streamPort == null){ - this.setNoStream(); - } else if (this.imgData != null) { - //From https://stackoverflow.com/questions/67507616/set-image-src-from-image-blob/67507685#67507685 - if(this.imgObjURL != null){ - URL.revokeObjectURL(this.imgObjURL) - } - this.imgObjURL = URL.createObjectURL(this.imgData); - - //Update the image with the new mimetype and image - this.image.src = this.imgObjURL; - this.noStream = false; - - } else { - //Nothing, hold previous image while waiting for next frame - } + //Conditional transitions + switch(this.dsm_cur_state) { + case this.DSM_DISCONNECTED: + //Immediately transition to waiting for the first frame + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + break; + case this.DSM_WAIT_FOR_VALID_PORT: + // Wait until the user has configured a valid port + if(this.streamPort > 0){ + this.dsm_cur_state = this.DSM_SUBSCRIBE; + } else { + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + } + break; + case this.DSM_SUBSCRIBE: + // Immediately transition after subscriptions is sent + this.dsm_cur_state = this.DSM_WAIT_FOR_FIRST_FRAME; + break; + case this.DSM_WAIT_FOR_FIRST_FRAME: + if(this.imgData != null){ + //we got some image data, start showing it + this.dsm_cur_state = this.DSM_SHOWING; + } else if (this.newStreamPortReq != null){ + //Stream port requested changed, unsubscribe and restart + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else { + this.dsm_cur_state = this.DSM_WAIT_FOR_FIRST_FRAME; + } + break; + case this.DSM_SHOWING: + if((now - this.imgDataTime) > 2500){ + //timeout, begin the restart sequence + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else if (this.newStreamPortReq != null){ + //Stream port requested changed, unsubscribe and restart + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else { + //stay in this state. + this.dsm_cur_state = this.DSM_SHOWING; + } + break; + case this.DSM_RESTART_UNSUBSCRIBE: + //Only should spend one loop in Unsubscribe, immediately transition + this.dsm_cur_state = this.DSM_RESTART_WAIT; + break; + case this.DSM_RESTART_WAIT: + if (timeInState > 250) { + //we've waited long enough, go to try to re-subscribe + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + } else { + //stay in this state. + this.dsm_cur_state = this.DSM_RESTART_WAIT; + } + break; + default: + // Shouldn't get here, default back to init + this.dsm_cur_state = this.DSM_DISCONNECTED; + } } + //take current-state or state-transition actions + + if(this.dsm_cur_state != this.dsm_prev_state){ + //Any state transition + console.log("State Change: " + this.dsm_prev_state + " -> " + this.dsm_cur_state); + } + + if(this.dsm_cur_state == this.DSM_SHOWING){ + // Currently in SHOWING + this.dispImageData(); + } + + if(this.dsm_cur_state != this.DSM_SHOWING && this.dsm_prev_state == this.DSM_SHOWING ){ + //Any transition out of showing - no stream + this.dispNoStream(); + } + + if(this.dsm_cur_state == this.DSM_RESTART_UNSUBSCRIBE){ + // Currently in UNSUBSCRIBE, do the unsubscribe actions + this.stopStream(); + this.dsm_restart_start_time = now; + } + + if(this.dsm_cur_state == this.DSM_SUBSCRIBE){ + // Currently in SUBSCRIBE, do the subscribe actions + this.startStream(); + this.dsm_restart_start_time = now; + } + + if(this.dsm_cur_state == this.DSM_WAIT_FOR_VALID_PORT){ + // Currently waiting for a vaild port to be requested + if(this.newStreamPortReq != null){ + this.streamPort = this.newStreamPortReq; + this.newStreamPortReq = null; + } + } requestAnimationFrame(()=>this.animationLoop()); } - setNoStream() { - this.noStreamPrev = this.noStream; - this.noStream = true; - if(this.noStreamPrev == false && this.noStream == true){ - //One-shot background change to preserve animation - this.image.src = require("../assets/loading.gif"); - } - } - startStream() { - if(this.serverConnectionActive == true && this.streamPort > 0){ - this.ws.send(JSON.stringify({"cmd": "subscribe", "port":this.streamPort})); - this.noStream = false; - } + console.log("Subscribing to port " + this.streamPort); + this.imgData = null; + this.ws.send(JSON.stringify({"cmd": "subscribe", "port":this.streamPort})); } stopStream() { - if(this.serverConnectionActive == true && this.streamPort > 0){ - this.ws.send(JSON.stringify({"cmd": "unsubscribe"})); - this.noStream = true; - } + console.log("Unsubscribing"); + this.ws.send(JSON.stringify({"cmd": "unsubscribe"})); + this.imgData = null; } setPort(streamPort){ - this.stopStream(); - this.frameRxCount = 0; - this.streamPort = streamPort; - this.startStream(); + console.log("Port set to " + streamPort); + this.newStreamPortReq = streamPort; } ws_onOpen() { // Set the flag allowing general server communication this.serverConnectionActive = true; - console.log("Connected!"); - this.startStream(); + console.log("Camera Websockets Connected!"); } ws_onClose(e) { - this.setNoStream(); - //Clear flags to stop server communication this.ws = null; this.serverConnectionActive = false; @@ -129,6 +222,7 @@ export class WebsocketVideoStream{ } ws_connect() { + this.serverConnectionActive = false; this.ws = new WebSocket(this.serverAddr); this.ws.binaryType = "blob"; this.ws.onopen = this.ws_onOpen.bind(this); diff --git a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java index 2b1840bf3..a2a30b017 100644 --- a/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java +++ b/photon-core/src/main/java/org/photonvision/vision/videoStream/SocketVideoStreamManager.java @@ -69,10 +69,13 @@ public class SocketVideoStreamManager { // Indicate a user would like to stop receiving one camera stream public void removeSubscription(WsContext user) { var port = userSubscriptions.get(user); - if (port != null) { + if (port != null && port != NO_STREAM_PORT) { var stream = streams.get(port); userSubscriptions.put(user, NO_STREAM_PORT); stream.removeUser(); + } else { + logger.error( + "User attempted to unsubscribe, but had not yet previously subscribed successfully."); } } diff --git a/photon-thinclient/thinclient.html b/photon-thinclient/thinclient.html index c18140365..dd3fe670b 100644 --- a/photon-thinclient/thinclient.html +++ b/photon-thinclient/thinclient.html @@ -45,89 +45,183 @@ this.drawDiv = drawDiv; this.image = document.getElementById(this.drawDiv); this.streamPort = streamPort; + this.newStreamPortReq = null; this.serverAddr = "ws://" + host + "/websocket_cameras"; - this.noStream = false; - this.noStreamPrev = false; - this.setNoStream(); + this.dispNoStream(); this.ws_connect(); this.imgData = null; this.imgDataTime = -1; this.imgObjURL = null; this.frameRxCount = 0; + //Display state machine + this.DSM_DISCONNECTED = "DISCONNECTED"; + this.DSM_WAIT_FOR_VALID_PORT = "WAIT_FOR_VALID_PORT"; + this.DSM_SUBSCRIBE = "SUBSCRIBE"; + this.DSM_WAIT_FOR_FIRST_FRAME = "WAIT_FOR_FIRST_FRAME"; + this.DSM_SHOWING = "SHOWING"; + this.DSM_RESTART_UNSUBSCRIBE = "UNSUBSCRIBE"; + this.DSM_RESTART_WAIT = "WAIT_BEFORE_SUBSCRIBE"; + + this.dsm_cur_state = this.DSM_DISCONNECTED; + this.dsm_prev_state = this.DSM_DISCONNECTED; + this.dsm_restart_start_time = window.performance.now(); + requestAnimationFrame(()=>this.animationLoop()); } + dispImageData(){ + //From https://stackoverflow.com/questions/67507616/set-image-src-from-image-blob/67507685#67507685 + if(this.imgObjURL != null){ + URL.revokeObjectURL(this.imgObjURL) + } + this.imgObjURL = URL.createObjectURL(this.imgData); + + //Update the image with the new mimetype and image + this.image.src = this.imgObjURL; + } + + dispNoStream() { + this.image.src = "loading.gif"; + } + animationLoop(){ + // Update time metrics var now = window.performance.now(); + var timeInState = now - this.dsm_restart_start_time; - if((now - this.imgDataTime) > 2500 && this.imgData != null){ - //Handle websocket send timeouts by restarting - this.setNoStream(); - this.stopStream(); - setTimeout(this.startStream.bind(this), 1000); //restart stream one second later + // Save previous state + this.dsm_prev_state = this.dsm_cur_state; + + // Evaluate state transitions + if(this.serverConnectionActive == false){ + //Any state - if the server connection goes false, always transition to disconnected + this.dsm_cur_state = this.DSM_DISCONNECTED; } else { - if(this.streamPort == null){ - this.setNoStream(); - } else if (this.imgData != null) { - //From https://stackoverflow.com/questions/67507616/set-image-src-from-image-blob/67507685#67507685 - if(this.imgObjURL != null){ - URL.revokeObjectURL(this.imgObjURL) - } - this.imgObjURL = URL.createObjectURL(this.imgData); + //Conditional transitions + switch(this.dsm_cur_state) { + case this.DSM_DISCONNECTED: + //Immediately transition to waiting for the first frame + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + break; + case this.DSM_WAIT_FOR_VALID_PORT: + // Wait until the user has configured a valid port + if(this.streamPort > 0){ + this.dsm_cur_state = this.DSM_SUBSCRIBE; + } else { + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + } + break; + case this.DSM_SUBSCRIBE: + // Immediately transition after subscriptions is sent + this.dsm_cur_state = this.DSM_WAIT_FOR_FIRST_FRAME; + break; + case this.DSM_WAIT_FOR_FIRST_FRAME: + if(this.imgData != null){ + //we got some image data, start showing it + this.dsm_cur_state = this.DSM_SHOWING; + } else if (this.newStreamPortReq != null){ + //Stream port requested changed, unsubscribe and restart + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else { + this.dsm_cur_state = this.DSM_WAIT_FOR_FIRST_FRAME; + } + break; + case this.DSM_SHOWING: + if((now - this.imgDataTime) > 2500){ + //timeout, begin the restart sequence + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else if (this.newStreamPortReq != null){ + //Stream port requested changed, unsubscribe and restart + this.dsm_cur_state = this.DSM_RESTART_UNSUBSCRIBE; + } else { + //stay in this state. + this.dsm_cur_state = this.DSM_SHOWING; + } + break; + case this.DSM_RESTART_UNSUBSCRIBE: + //Only should spend one loop in Unsubscribe, immediately transition + this.dsm_cur_state = this.DSM_RESTART_WAIT; + break; + case this.DSM_RESTART_WAIT: + if (timeInState > 250) { + //we've waited long enough, go to try to re-subscribe + this.dsm_cur_state = this.DSM_WAIT_FOR_VALID_PORT; + } else { + //stay in this state. + this.dsm_cur_state = this.DSM_RESTART_WAIT; + } + break; + default: + // Shouldn't get here, default back to init + this.dsm_cur_state = this.DSM_DISCONNECTED; + } + } - //Update the image with the new mimetype and image - this.image.src = this.imgObjURL; - this.noStream = false; + //take current-state or state-transition actions - } else { - //Nothing, hold previous image while waiting for next frame + if(this.dsm_cur_state != this.dsm_prev_state){ + //Any state transition + console.log("State Change: " + this.dsm_prev_state + " -> " + this.dsm_cur_state); + } + + if(this.dsm_cur_state == this.DSM_SHOWING){ + // Currently in SHOWING + this.dispImageData(); + } + + if(this.dsm_cur_state != this.DSM_SHOWING && this.dsm_prev_state == this.DSM_SHOWING ){ + //Any transition out of showing - no stream + this.dispNoStream(); + } + + if(this.dsm_cur_state == this.DSM_RESTART_UNSUBSCRIBE){ + // Currently in UNSUBSCRIBE, do the unsubscribe actions + this.stopStream(); + this.dsm_restart_start_time = now; + } + + if(this.dsm_cur_state == this.DSM_SUBSCRIBE){ + // Currently in SUBSCRIBE, do the subscribe actions + this.startStream(); + this.dsm_restart_start_time = now; + } + + if(this.dsm_cur_state == this.DSM_WAIT_FOR_VALID_PORT){ + // Currently waiting for a vaild port to be requested + if(this.newStreamPortReq != null){ + this.streamPort = this.newStreamPortReq; + this.newStreamPortReq = null; } } requestAnimationFrame(()=>this.animationLoop()); } - setNoStream() { - this.noStreamPrev = this.noStream; - this.noStream = true; - if(this.noStreamPrev == false && this.noStream == true){ - //One-shot background change to preserve animation - this.image.src = "loading.gif"; - } - } - startStream() { - if(this.serverConnectionActive == true && this.streamPort > 0){ - this.ws.send(JSON.stringify({"cmd": "subscribe", "port":this.streamPort})); - this.noStream = false; - } + console.log("Subscribing to port " + this.streamPort); + this.imgData = null; + this.ws.send(JSON.stringify({"cmd": "subscribe", "port":this.streamPort})); } stopStream() { - if(this.serverConnectionActive == true && this.streamPort > 0){ - this.ws.send(JSON.stringify({"cmd": "unsubscribe"})); - this.noStream = true; - } + console.log("Unsubscribing"); + this.ws.send(JSON.stringify({"cmd": "unsubscribe"})); + this.imgData = null; } setPort(streamPort){ - this.stopStream(); - this.frameRxCount = 0; - this.streamPort = streamPort; - this.startStream(); + console.log("Port set to " + streamPort); + this.newStreamPortReq = streamPort; } ws_onOpen() { // Set the flag allowing general server communication this.serverConnectionActive = true; console.log("Connected!"); - this.startStream(); } ws_onClose(e) { - this.setNoStream(); - //Clear flags to stop server communication this.ws = null; this.serverConnectionActive = false; @@ -165,6 +259,7 @@ } ws_connect() { + this.serverConnectionActive = false; this.ws = new WebSocket(this.serverAddr); this.ws.binaryType = "blob"; this.ws.onopen = this.ws_onOpen.bind(this); @@ -187,7 +282,6 @@ var port = document.getElementById("port").value; if(stream == null){ stream = new WebsocketVideoStream("streamImg",port,host); - stream.startStream(); } else { stream.setPort(port); }