Start implementing network connectivity.

Change-Id: I82875bfd1c3a086033d22de636d4fb43455ebac3
This commit is contained in:
Peter Johnson
2015-07-13 22:46:41 -07:00
parent 8fdaf61ef1
commit e640708245
10 changed files with 647 additions and 0 deletions

71
src/NetworkConnection.cpp Normal file
View File

@@ -0,0 +1,71 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#include "NetworkConnection.h"
#include "tcpsockets/TCPStream.h"
#include "raw_socket_istream.h"
#include "WireDecoder.h"
#include "WireEncoder.h"
using namespace ntimpl;
NetworkConnection::NetworkConnection(std::unique_ptr<TCPStream> stream,
BatchQueue& outgoing, Queue& incoming,
Message::GetEntryTypeFunc get_entry_type)
: m_stream(std::move(stream)),
m_outgoing(outgoing),
m_incoming(incoming),
m_get_entry_type(get_entry_type),
m_active(false),
m_proto_rev(0x0300) {}
NetworkConnection::~NetworkConnection() { Stop(); }
void NetworkConnection::Start() {
m_active = true;
m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this);
m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this);
}
void NetworkConnection::Stop() {
if (m_stream) m_stream->close();
m_active = false;
if (m_write_thread.joinable()) m_write_thread.join();
if (m_read_thread.joinable()) m_read_thread.join();
}
void NetworkConnection::ReadThreadMain() {
raw_socket_istream is(*m_stream);
WireDecoder decoder(is, m_proto_rev);
while (m_active) {
if (!m_stream)
break;
decoder.set_proto_rev(m_proto_rev);
decoder.Reset();
auto msg = std::make_shared<Message>();
if (!Message::Read(decoder, m_get_entry_type, &(*msg))) break;
m_incoming.push(msg);
}
m_active = false;
}
void NetworkConnection::WriteThreadMain() {
WireEncoder encoder(m_proto_rev);
while (m_active) {
auto msgs = m_outgoing.pop();
encoder.set_proto_rev(m_proto_rev);
encoder.Reset();
for (auto& msg : msgs) msg->Write(encoder);
TCPStream::Error err;
if (!m_stream) break;
if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break;
}
m_active = false;
}

59
src/NetworkConnection.h Normal file
View File

@@ -0,0 +1,59 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef NT_NETWORKCONNECTION_H_
#define NT_NETWORKCONNECTION_H_
#include <atomic>
#include <memory>
#include <thread>
#include "support/ConcurrentQueue.h"
#include "Message.h"
class TCPStream;
namespace ntimpl {
class NetworkConnection {
public:
typedef ConcurrentQueue<std::shared_ptr<Message>> Queue;
typedef ConcurrentQueue<std::vector<std::shared_ptr<Message>>> BatchQueue;
NetworkConnection(std::unique_ptr<TCPStream> stream, BatchQueue& outgoing,
Queue& incoming, Message::GetEntryTypeFunc get_entry_type);
~NetworkConnection();
void Start();
void Stop();
bool active() const { return m_active; }
TCPStream& stream() { return *m_stream; }
void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; }
NetworkConnection(const NetworkConnection&) = delete;
NetworkConnection& operator=(const NetworkConnection&) = delete;
NetworkConnection(NetworkConnection&&) = default;
NetworkConnection& operator=(NetworkConnection&&) = default;
private:
void ReadThreadMain();
void WriteThreadMain();
std::unique_ptr<TCPStream> m_stream;
BatchQueue& m_outgoing;
Queue& m_incoming;
Message::GetEntryTypeFunc m_get_entry_type;
std::thread m_read_thread;
std::thread m_write_thread;
std::atomic_bool m_active;
std::atomic_uint m_proto_rev;
};
} // namespace ntimpl
#endif // NT_NETWORKCONNECTION_H_

View File

@@ -0,0 +1,28 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#include "raw_socket_istream.h"
using namespace ntimpl;
raw_socket_istream::~raw_socket_istream() {}
bool raw_socket_istream::read(void* data, std::size_t len) {
char* cdata = static_cast<char*>(data);
std::size_t pos = 0;
while (pos < len) {
TCPStream::Error err;
std::size_t count =
m_stream.receive(&cdata[pos], len - pos, &err, m_timeout);
if (count == 0) return false;
pos += count;
}
return true;
}
void raw_socket_istream::close() { m_stream.close(); }

32
src/raw_socket_istream.h Normal file
View File

@@ -0,0 +1,32 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef NT_RAW_SOCKET_ISTREAM_H_
#define NT_RAW_SOCKET_ISTREAM_H_
#include "raw_istream.h"
#include "tcpsockets/TCPStream.h"
namespace ntimpl {
class raw_socket_istream : public raw_istream {
public:
raw_socket_istream(TCPStream& stream, int timeout = 0)
: m_stream(stream), m_timeout(timeout) {}
virtual ~raw_socket_istream();
virtual bool read(void* data, std::size_t len);
virtual void close();
private:
TCPStream& m_stream;
int m_timeout;
};
} // namespace ntimpl
#endif // NT_RAW_SOCKET_ISTREAM_H_

View File

@@ -0,0 +1,86 @@
/*
TCPAcceptor.cpp
TCPAcceptor class definition. TCPAcceptor provides methods to passively
establish TCP/IP connections with clients.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "TCPAcceptor.h"
#include <cstdio>
#include <cstring>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
TCPAcceptor::TCPAcceptor(int port, const char* address)
: m_lsd(0), m_port(port), m_address(address), m_listening(false) {}
TCPAcceptor::~TCPAcceptor() {
if (m_lsd > 0) {
close(m_lsd);
}
}
int TCPAcceptor::start() {
if (m_listening) return 0;
m_lsd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = PF_INET;
address.sin_port = htons(m_port);
if (m_address.size() > 0) {
inet_pton(PF_INET, m_address.c_str(), &(address.sin_addr));
} else {
address.sin_addr.s_addr = INADDR_ANY;
}
int optval = 1;
setsockopt(m_lsd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
int result = bind(m_lsd, (struct sockaddr*)&address, sizeof(address));
if (result != 0) {
perror("bind() failed");
return result;
}
result = listen(m_lsd, 5);
if (result != 0) {
perror("listen() failed");
return result;
}
m_listening = true;
return result;
}
std::unique_ptr<TCPStream> TCPAcceptor::accept() {
if (!m_listening) return nullptr;
struct sockaddr_in address;
socklen_t len = sizeof(address);
std::memset(&address, 0, sizeof(address));
int sd = ::accept(m_lsd, (struct sockaddr*)&address, &len);
if (sd < 0) {
perror("accept() failed");
return nullptr;
}
return std::unique_ptr<TCPStream>(new TCPStream(sd, &address));
}

View File

@@ -0,0 +1,49 @@
/*
TCPAcceptor.h
TCPAcceptor class interface. TCPAcceptor provides methods to passively
establish TCP/IP connections with clients.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef TCPSOCKETS_TCPACCEPTOR_H_
#define TCPSOCKETS_TCPACCEPTOR_H_
#include <memory>
#include <string>
#include "TCPStream.h"
class TCPAcceptor {
int m_lsd;
int m_port;
std::string m_address;
bool m_listening;
public:
TCPAcceptor(int port, const char* address);
~TCPAcceptor();
int start();
std::unique_ptr<TCPStream> accept();
private:
TCPAcceptor() {}
};
#endif

View File

@@ -0,0 +1,120 @@
/*
TCPConnector.h
TCPConnector class definition. TCPConnector provides methods to actively
establish TCP/IP connections with a server.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License
*/
#include "TCPConnector.h"
#include <errno.h>
#include <fcntl.h>
#include <cstdio>
#include <cstring>
#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/in.h>
static int ResolveHostName(const char* hostname, struct in_addr* addr) {
struct addrinfo* res;
int result = getaddrinfo(hostname, nullptr, nullptr, &res);
if (result == 0) {
std::memcpy(addr, &((struct sockaddr_in*)res->ai_addr)->sin_addr,
sizeof(struct in_addr));
freeaddrinfo(res);
}
return result;
}
std::unique_ptr<TCPStream> TCPConnector::connect(const char* server, int port) {
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
if (ResolveHostName(server, &(address.sin_addr)) != 0) {
inet_pton(PF_INET, server, &(address.sin_addr));
}
int sd = socket(AF_INET, SOCK_STREAM, 0);
if (::connect(sd, (struct sockaddr*)&address, sizeof(address)) != 0) {
perror("connect() failed");
return nullptr;
}
return std::unique_ptr<TCPStream>(new TCPStream(sd, &address));
}
std::unique_ptr<TCPStream> TCPConnector::connect(const char* server, int port,
int timeout) {
if (timeout == 0) return connect(server, port);
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
if (ResolveHostName(server, &(address.sin_addr)) != 0) {
inet_pton(PF_INET, server, &(address.sin_addr));
}
long arg;
fd_set sdset;
struct timeval tv;
socklen_t len;
int result = -1, valopt, sd = socket(AF_INET, SOCK_STREAM, 0);
// Set socket to non-blocking
arg = fcntl(sd, F_GETFL, nullptr);
arg |= O_NONBLOCK;
fcntl(sd, F_SETFL, arg);
// Connect with time limit
std::string message;
if ((result = ::connect(sd, (struct sockaddr*)&address, sizeof(address))) <
0) {
if (errno == EINPROGRESS) {
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&sdset);
FD_SET(sd, &sdset);
if (select(sd + 1, nullptr, &sdset, nullptr, &tv) > 0) {
len = sizeof(int);
getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)(&valopt), &len);
if (valopt) {
fprintf(stderr, "connect() error %d - %s\n", valopt,
strerror(valopt));
}
// connection established
else
result = 0;
} else
fprintf(stderr, "connect() timed out\n");
} else
fprintf(stderr, "connect() error %d - %s\n", errno, strerror(errno));
}
// Return socket to blocking mode
arg = fcntl(sd, F_GETFL, nullptr);
arg &= (~O_NONBLOCK);
fcntl(sd, F_SETFL, arg);
// Create stream object if connected
if (result == -1) return nullptr;
return std::unique_ptr<TCPStream>(new TCPStream(sd, &address));
}

View File

@@ -0,0 +1,38 @@
/*
TCPConnector.h
TCPConnector class interface. TCPConnector provides methods to actively
establish TCP/IP connections with a server.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License
*/
#ifndef TCPSOCKETS_TCPCONNECTOR_H_
#define TCPSOCKETS_TCPCONNECTOR_H_
#include <memory>
#include "TCPStream.h"
class TCPConnector {
public:
static std::unique_ptr<TCPStream> connect(const char* server, int port);
static std::unique_ptr<TCPStream> connect(const char* server, int port,
int timeout);
};
#endif

View File

@@ -0,0 +1,92 @@
/*
TCPStream.h
TCPStream class definition. TCPStream provides methods to trasnfer
data between peers over a TCP/IP connection.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "TCPStream.h"
#include <arpa/inet.h>
#include <unistd.h>
TCPStream::TCPStream(int sd, struct sockaddr_in* address) : m_sd(sd) {
char ip[50];
inet_ntop(PF_INET, (struct in_addr*)&(address->sin_addr.s_addr), ip,
sizeof(ip) - 1);
m_peerIP = ip;
m_peerPort = ntohs(address->sin_port);
}
TCPStream::~TCPStream() { close(); }
std::size_t TCPStream::send(const char* buffer, std::size_t len, Error* err) {
if (m_sd < 0) {
*err = kConnectionClosed;
return 0;
}
ssize_t rv = write(m_sd, buffer, len);
if (rv < 0) {
*err = kConnectionReset;
return 0;
}
return static_cast<std::size_t>(rv);
}
std::size_t TCPStream::receive(char* buffer, std::size_t len, Error* err,
int timeout) {
if (m_sd < 0) {
*err = kConnectionClosed;
return 0;
}
ssize_t rv;
if (timeout <= 0)
rv = read(m_sd, buffer, len);
else if (WaitForReadEvent(timeout))
rv = read(m_sd, buffer, len);
else {
*err = kConnectionTimedOut;
return 0;
}
if (rv < 0) {
*err = kConnectionReset;
return 0;
}
return static_cast<std::size_t>(rv);
}
void TCPStream::close() {
if (m_sd >= 0)
::close(m_sd);
m_sd = -1;
}
bool TCPStream::WaitForReadEvent(int timeout) {
fd_set sdset;
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&sdset);
FD_SET(m_sd, &sdset);
if (select(m_sd + 1, &sdset, NULL, NULL, &tv) > 0) {
return true;
}
return false;
}

View File

@@ -0,0 +1,72 @@
/*
TCPStream.h
TCPStream class interface. TCPStream provides methods to trasnfer
data between peers over a TCP/IP connection.
------------------------------------------
Copyright © 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef TCPSOCKETS_TCPSTREAM_H_
#define TCPSOCKETS_TCPSTREAM_H_
#include <cstddef>
#include <string>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#endif
#include "llvm/StringRef.h"
class TCPStream {
int m_sd;
std::string m_peerIP;
int m_peerPort;
public:
friend class TCPAcceptor;
friend class TCPConnector;
~TCPStream();
enum Error {
kConnectionClosed = 0,
kConnectionReset = -1,
kConnectionTimedOut = -2
};
std::size_t send(const char* buffer, std::size_t len, Error* err);
std::size_t receive(char* buffer, std::size_t len, Error* err,
int timeout = 0);
void close();
llvm::StringRef getPeerIP() const { return m_peerIP; }
int getPeerPort() const { return m_peerPort; }
TCPStream(const TCPStream& stream) = delete;
TCPStream& operator=(const TCPStream&) = delete;
private:
bool WaitForReadEvent(int timeout);
TCPStream(int sd, struct sockaddr_in* address);
TCPStream() = delete;
};
#endif