Switch DS Sim Sockets to use UV loops (#1211)

Also adds the simulation packet to force switch the DS to local.
This commit is contained in:
Thad House
2018-07-22 23:03:55 -07:00
committed by Peter Johnson
parent eb2c6e19f8
commit a34df5589e
6 changed files with 147 additions and 178 deletions

View File

@@ -17,6 +17,8 @@
#include <FRCComm.h>
#include <mockdata/DriverStationData.h>
using namespace halsim;
/*----------------------------------------------------------------------------
** The following methods help parse and hold information about the
** driver station and it's joysticks.

View File

@@ -19,146 +19,87 @@
#include <cstring>
#include <iostream>
#if defined(Win32) || defined(_WIN32)
#include <windows.h>
#pragma comment(lib, "Ws2_32.lib")
#else
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#include <DSCommPacket.h>
#include <wpi/EventLoopRunner.h>
#include <wpi/StringRef.h>
#include <wpi/raw_ostream.h>
#include <wpi/uv/Tcp.h>
#include <wpi/uv/Timer.h>
#include <wpi/uv/Udp.h>
#include <wpi/uv/util.h>
/*----------------------------------------------------------------------------
** Open a socket and listen for connections
** Returns socket handle on success, -1 on error
**--------------------------------------------------------------------------*/
static int OpenListenSocket(int port, bool tcp) {
#if defined(WIN32) || defined(_WIN32)
SOCKET s;
#else
int s;
#if defined(Win32) || defined(_WIN32)
#pragma comment(lib, "Ws2_32.lib")
#endif
if (tcp)
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
else
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
using namespace wpi::uv;
if (!s) {
std::perror("socket");
return -1;
}
static std::unique_ptr<Buffer> singleByte;
int reuse = 1;
#if defined(WIN32) || defined(_WIN32)
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse),
sizeof(reuse));
#else
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, static_cast<void*>(&reuse),
sizeof(reuse));
#endif
namespace {
struct DataStore {
wpi::SmallVector<char, 128> m_frame;
size_t m_frameSize = std::numeric_limits<size_t>::max();
halsim::DSCommPacket* dsPacket;
};
} // namespace
struct sockaddr_in addr;
std::memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(s, (struct sockaddr*)&addr, sizeof(addr))) {
std::perror("bind");
#if defined(WIN32) || defined(_WIN32)
closesocket(s);
#else
close(s);
#endif
return -1;
}
if (tcp) {
if (listen(s, 10)) {
#if defined(WIN32) || defined(_WIN32)
closesocket(s);
#else
close(s);
#endif
return -1;
static void HandleTcpDataStream(Buffer& buf, size_t size, DataStore& store) {
wpi::StringRef data{buf.base, size};
while (!data.empty()) {
if (store.m_frameSize != std::numeric_limits<size_t>::max()) {
if (store.m_frame.size() < 2u) {
size_t toCopy = std::min(2u - store.m_frame.size(), data.size());
store.m_frame.append(data.bytes_begin(), data.bytes_begin() + toCopy);
data = data.drop_front(toCopy);
if (store.m_frame.size() < 2u) return; // need more data
}
store.m_frameSize = (static_cast<uint16_t>(store.m_frame[0]) << 8) |
static_cast<uint16_t>(store.m_frame[1]);
}
if (store.m_frameSize != 0) {
size_t need = store.m_frameSize - (store.m_frame.size() - 2);
size_t toCopy = std::min(need, data.size());
store.m_frame.append(data.bytes_begin(), data.bytes_begin() + toCopy);
data = data.drop_front(toCopy);
need -= toCopy;
if (need == 0) {
auto ds = store.dsPacket;
ds->DecodeTCP(reinterpret_cast<uint8_t*>(store.m_frame.data()),
store.m_frame.size());
ds->SendTCPToHALSim();
store.m_frame.clear();
store.m_frameSize = std::numeric_limits<size_t>::max();
}
}
}
return s;
}
/*----------------------------------------------------------------------------
** TCP thread
** This thread only dies at program shut down; it opens a socket
** and listens for incoming connections, and then processes data
** sent on the socket.
**--------------------------------------------------------------------------*/
static void TCPThreadFunc(DSCommPacket* ds) {
static const int kTCPPort = 1740;
while (true) {
int socket = OpenListenSocket(kTCPPort, true);
if (socket < 0) {
#if defined(WIN32) || defined(_WIN32)
Sleep(1000);
#else
sleep(1);
#endif
continue;
}
static void SetupTcp(wpi::uv::Loop& loop) {
auto tcp = Tcp::Create(loop);
auto tcpWaitTimer = Timer::Create(loop);
while (true) {
int client = accept(socket, NULL, 0);
if (client < 0) {
#if defined(WIN32) || defined(_WIN32)
closesocket(socket);
#else
close(socket);
#endif
break;
}
auto recStore = std::make_shared<DataStore>();
uint8_t buf[8192];
int len = 0;
do {
int rc = recv(client, reinterpret_cast<char*>(buf + len),
sizeof(buf) - len, 0);
if (rc <= 0) break;
len += rc;
tcp->SetData(recStore);
do {
int deduct = ds->DecodeTCP(buf, len);
if (deduct <= 0) break;
std::memmove(buf, buf + deduct, len - deduct);
len -= deduct;
if (deduct > 2) ds->SendTCPToHALSim();
} while (true);
} while (true);
tcp->Bind("0.0.0.0", 1740);
#if defined(WIN32) || defined(_WIN32)
closesocket(client);
Sleep(1000);
#else
close(client);
sleep(1);
#endif
}
}
tcp->Listen([t = tcp.get()] {
auto client = t->Accept();
t->data.connect([t](Buffer& buf, size_t len) {
HandleTcpDataStream(buf, len, *t->GetData<DataStore>());
});
});
}
/*----------------------------------------------------------------------------
** Send a reply packet back to the DS
**--------------------------------------------------------------------------*/
static void SendReplyPacket(int socket, struct sockaddr* addr, int addrlen,
int reply_port, DSCommPacket* ds) {
static void SetupReplyPacket(halsim::DSCommPacket* ds) {
static const uint8_t kTagGeneral = 0x01;
struct sockaddr_in* in4 = reinterpret_cast<struct sockaddr_in*>(addr);
in4->sin_port = htons(reply_port);
uint8_t data[8];
std::memset(data, 0, sizeof(data));
uint8_t* data = reinterpret_cast<uint8_t*>(ds->GetSendBuffer().base);
ds->GetIndex(data[0], data[1]);
@@ -169,58 +110,61 @@ static void SendReplyPacket(int socket, struct sockaddr* addr, int addrlen,
data[5] = 12; // Voltage upper
data[6] = 0; // Voltage lower
data[7] = 0; // Request
#if defined(WIN32) || defined(_WIN32)
sendto(socket, reinterpret_cast<const char*>(data), sizeof(data), 0, addr,
addrlen);
#else
sendto(socket, data, sizeof(data), 0, addr, addrlen);
#endif
}
/*----------------------------------------------------------------------------
** UDP thread
** This thread only dies at program shut down; it opens a socket
** and listens for incoming connections, and then processes data
** sent on the socket.
**--------------------------------------------------------------------------*/
static void UDPThreadFunc(DSCommPacket* ds) {
static const int kUDPListenPort = 1110;
static const int kUDPReplyPort = 1150;
while (true) {
int socket = OpenListenSocket(kUDPListenPort, false);
if (socket < 0) {
#if defined(WIN32) || defined(_WIN32)
Sleep(1000);
#else
sleep(1);
#endif
continue;
}
static void SetupUdp(wpi::uv::Loop& loop) {
auto udp = wpi::uv::Udp::Create(loop);
udp->Bind("0.0.0.0", 1110);
do {
uint8_t buf[1024];
struct sockaddr addr;
#if defined(Win32) || defined(_WIN32)
int addrlen = sizeof(addr);
int rc = recvfrom(socket, reinterpret_cast<char*>(buf), sizeof(buf), 0,
&addr, &addrlen);
#else
socklen_t addrlen = sizeof(addr);
ssize_t rc = recvfrom(socket, buf, sizeof(buf), 0, &addr, &addrlen);
#endif
// Simulation mode packet
auto simLoopTimer = Timer::Create(loop);
struct sockaddr_in simAddr;
NameToAddr("127.0.0.1", 1135, &simAddr);
simLoopTimer->timeout.connect([ udpLocal = udp.get(), simAddr ] {
udpLocal->Send(simAddr, wpi::ArrayRef<Buffer>{singleByte.get(), 1},
[](auto buf, Error err) {
if (err) {
wpi::errs() << err.str() << "\n";
wpi::errs().flush();
}
});
});
simLoopTimer->Start(Timer::Time{100}, Timer::Time{100});
if (rc > 0) {
ds->DecodeUDP(buf, rc);
SendReplyPacket(socket, &addr, addrlen, kUDPReplyPort, ds);
ds->SendUDPToHALSim();
} else {
break;
}
} while (true);
}
// UDP Receive then send
udp->received.connect([udpLocal = udp.get()](
Buffer & buf, size_t len, const sockaddr& recSock, unsigned int port) {
auto ds = udpLocal->GetLoop()->GetData<halsim::DSCommPacket>();
ds->DecodeUDP(reinterpret_cast<uint8_t*>(buf.base), len);
SetupReplyPacket(ds.get());
struct sockaddr_in outAddr;
std::memcpy(&outAddr, &recSock, sizeof(sockaddr_in));
outAddr.sin_family = PF_INET;
outAddr.sin_port = htons(1150);
udpLocal->Send(outAddr, wpi::ArrayRef<Buffer>{&ds->GetSendBuffer(), 1},
[](auto buf, Error err) {
if (err) {
wpi::errs() << err.str() << "\n";
wpi::errs().flush();
}
});
ds->SendUDPToHALSim();
});
udp->StartRecv();
}
static void SetupEventLoop(wpi::uv::Loop& loop) {
auto loopData = std::make_shared<halsim::DSCommPacket>();
loop.SetData(loopData);
SetupUdp(loop);
SetupTcp(loop);
}
static std::unique_ptr<wpi::EventLoopRunner> eventLoopRunner;
/*----------------------------------------------------------------------------
** Main entry point. We will start listen threads going, processing
** against our driver station packet
@@ -230,9 +174,6 @@ extern "C" {
__declspec(dllexport)
#endif
int HALSIM_InitExtension(void) {
static DSCommPacket ds;
static std::thread udp_thread;
static std::thread tcp_thread;
static bool once = false;
if (once) {
@@ -244,11 +185,11 @@ __declspec(dllexport)
std::cout << "DriverStationSocket Initializing." << std::endl;
udp_thread = std::thread(UDPThreadFunc, &ds);
udp_thread.detach();
singleByte = std::make_unique<Buffer>("0");
tcp_thread = std::thread(TCPThreadFunc, &ds);
tcp_thread.detach();
eventLoopRunner = std::make_unique<wpi::EventLoopRunner>();
eventLoopRunner->ExecAsync(SetupEventLoop);
std::cout << "DriverStationSocket Initialized!" << std::endl;
return 0;