Fix selective send (#90)

* Move VisionSettingsChangeSubscriber to own class

* Use selective sending on simple PSCs

* Fix origin context skip logic
This commit is contained in:
Banks T
2020-08-17 16:20:36 -04:00
committed by GitHub
parent b3436765e1
commit 832d8413e1
11 changed files with 264 additions and 278 deletions

View File

@@ -54,7 +54,11 @@ public class Server {
ws.onBinaryMessage(
ctx ->
logger.debug(
"Got WebSockets binary message from host " + ctx.host())));
() -> {
var insa = ctx.session.getRemote().getInetSocketAddress();
var host = insa.getAddress().toString() + ":" + insa.getPort();
return "Got WebSockets binary message from host " + host;
})));
});
var socketHandler = SocketHandler.getInstance();

View File

@@ -70,7 +70,8 @@ public class SocketHandler {
public void onConnect(WsConnectContext context) {
context.session.setIdleTimeout(Long.MAX_VALUE); // TODO: determine better value
var host = context.session.getRemote().getInetSocketAddress().getHostName();
var insa = context.session.getRemote().getInetSocketAddress();
var host = insa.getAddress().toString() + ":" + insa.getPort();
logger.info("New websocket connection from " + host);
users.add(context);
dcService.publishEvent(
@@ -79,7 +80,8 @@ public class SocketHandler {
}
protected void onClose(WsCloseContext context) {
var host = context.session.getRemote().getInetSocketAddress().getHostName();
var insa = context.session.getRemote().getInetSocketAddress();
var host = insa.getAddress().toString() + ":" + insa.getPort();
var reason = context.reason() != null ? context.reason() : "Connection closed by client";
logger.info("Closing websocket connection from " + host + " for reason: " + reason);
users.remove(context);
@@ -301,19 +303,25 @@ public class SocketHandler {
}
}
// TODO: change to use the DataFlow system
private void sendMessage(Object message, WsContext user) throws JsonProcessingException {
ByteBuffer b = ByteBuffer.wrap(objectMapper.writeValueAsBytes(message));
user.send(b);
}
// TODO: change to use the DataFlow system
public void broadcastMessage(Object message, WsContext userToSkip)
throws JsonProcessingException {
for (WsContext user : users) {
if (user != userToSkip) {
if (userToSkip == null) {
for (WsContext user : users) {
sendMessage(message, user);
}
} else {
var skipUserPort = userToSkip.session.getRemote().getInetSocketAddress().getPort();
for (WsContext user : users) {
var userPort = user.session.getRemote().getInetSocketAddress().getPort();
if (userPort != skipUserPort) {
sendMessage(message, user);
}
}
}
}
}

View File

@@ -36,16 +36,15 @@ public class UIInboundSubscriber extends DataChangeSubscriber {
}
@Override
public void onDataChangeEvent(DataChangeEvent event) {
public void onDataChangeEvent(DataChangeEvent<?> event) {
if (event instanceof IncomingWebSocketEvent) {
var incomingWSEvent = (IncomingWebSocketEvent) event;
var incomingWSEvent = (IncomingWebSocketEvent<?>) event;
if (incomingWSEvent.propertyName.equals("userConnected")
|| incomingWSEvent.propertyName.equals("sendFullSettings")) {
// Send full settings
var settings = ConfigManager.getInstance().getConfig().toHashMap();
var message =
new OutgoingUIEvent<>(
UIUpdateType.BROADCAST, "fullsettings", settings, incomingWSEvent.originContext);
new OutgoingUIEvent<>("fullsettings", settings, incomingWSEvent.originContext);
DataChangeService.getInstance().publishEvent(message);
}
}

View File

@@ -20,7 +20,6 @@ package org.photonvision.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Collections;
import java.util.HashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.photonvision.common.dataflow.DataChangeDestination;
import org.photonvision.common.dataflow.DataChangeSource;
import org.photonvision.common.dataflow.DataChangeSubscriber;
@@ -31,7 +30,7 @@ import org.photonvision.common.logging.Logger;
@SuppressWarnings("rawtypes")
/*
* DO NOT use logging in this class. If you do, the logs will recuse forever!
* DO NOT use logging in this class. If you do, the logs will recurse forever!
*/
class UIOutboundSubscriber extends DataChangeSubscriber {
Logger logger = new Logger(UIOutboundSubscriber.class, LogGroup.WebServer);
@@ -48,25 +47,11 @@ class UIOutboundSubscriber extends DataChangeSubscriber {
if (event instanceof OutgoingUIEvent) {
var thisEvent = (OutgoingUIEvent) event;
try {
switch (thisEvent.updateType) {
case BROADCAST:
{
if (event.data instanceof HashMap) {
var data = (HashMap) event.data;
socketHandler.broadcastMessage(data, null);
} else {
socketHandler.broadcastMessage(event.data, null);
}
break;
}
case SINGLEUSER:
{
if (event.data instanceof Pair) {
var pair = (SocketHandler.SelectiveBroadcastPair) event.data;
socketHandler.broadcastMessage(pair.getLeft(), pair.getRight());
}
break;
}
if (event.data instanceof HashMap) {
var data = (HashMap) event.data;
socketHandler.broadcastMessage(data, thisEvent.originContext);
} else {
socketHandler.broadcastMessage(event.data, thisEvent.originContext);
}
} catch (JsonProcessingException e) {
logger.error("Failed to process outgoing message!", e);

View File

@@ -1,23 +0,0 @@
/*
* Copyright (C) 2020 Photon Vision.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.photonvision.server;
public enum UIUpdateType {
BROADCAST,
SINGLEUSER
}