diff --git a/src/NetworkConnection.cpp b/src/NetworkConnection.cpp new file mode 100644 index 0000000000..b210964826 --- /dev/null +++ b/src/NetworkConnection.cpp @@ -0,0 +1,71 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "NetworkConnection.h" + +#include "tcpsockets/TCPStream.h" +#include "raw_socket_istream.h" +#include "WireDecoder.h" +#include "WireEncoder.h" + +using namespace ntimpl; + +NetworkConnection::NetworkConnection(std::unique_ptr stream, + BatchQueue& outgoing, Queue& incoming, + Message::GetEntryTypeFunc get_entry_type) + : m_stream(std::move(stream)), + m_outgoing(outgoing), + m_incoming(incoming), + m_get_entry_type(get_entry_type), + m_active(false), + m_proto_rev(0x0300) {} + +NetworkConnection::~NetworkConnection() { Stop(); } + +void NetworkConnection::Start() { + m_active = true; + m_write_thread = std::thread(&NetworkConnection::WriteThreadMain, this); + m_read_thread = std::thread(&NetworkConnection::ReadThreadMain, this); +} + +void NetworkConnection::Stop() { + if (m_stream) m_stream->close(); + m_active = false; + if (m_write_thread.joinable()) m_write_thread.join(); + if (m_read_thread.joinable()) m_read_thread.join(); +} + +void NetworkConnection::ReadThreadMain() { + raw_socket_istream is(*m_stream); + WireDecoder decoder(is, m_proto_rev); + + while (m_active) { + if (!m_stream) + break; + decoder.set_proto_rev(m_proto_rev); + decoder.Reset(); + auto msg = std::make_shared(); + if (!Message::Read(decoder, m_get_entry_type, &(*msg))) break; + m_incoming.push(msg); + } + m_active = false; +} + +void NetworkConnection::WriteThreadMain() { + WireEncoder encoder(m_proto_rev); + + while (m_active) { + auto msgs = m_outgoing.pop(); + encoder.set_proto_rev(m_proto_rev); + encoder.Reset(); + for (auto& msg : msgs) msg->Write(encoder); + TCPStream::Error err; + if (!m_stream) break; + if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break; + } + m_active = false; +} diff --git a/src/NetworkConnection.h b/src/NetworkConnection.h new file mode 100644 index 0000000000..79aa57c6c6 --- /dev/null +++ b/src/NetworkConnection.h @@ -0,0 +1,59 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef NT_NETWORKCONNECTION_H_ +#define NT_NETWORKCONNECTION_H_ + +#include +#include +#include + +#include "support/ConcurrentQueue.h" +#include "Message.h" + +class TCPStream; + +namespace ntimpl { + +class NetworkConnection { + public: + typedef ConcurrentQueue> Queue; + typedef ConcurrentQueue>> BatchQueue; + + NetworkConnection(std::unique_ptr stream, BatchQueue& outgoing, + Queue& incoming, Message::GetEntryTypeFunc get_entry_type); + ~NetworkConnection(); + + void Start(); + void Stop(); + + bool active() const { return m_active; } + TCPStream& stream() { return *m_stream; } + void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; } + + NetworkConnection(const NetworkConnection&) = delete; + NetworkConnection& operator=(const NetworkConnection&) = delete; + NetworkConnection(NetworkConnection&&) = default; + NetworkConnection& operator=(NetworkConnection&&) = default; + + private: + void ReadThreadMain(); + void WriteThreadMain(); + + std::unique_ptr m_stream; + BatchQueue& m_outgoing; + Queue& m_incoming; + Message::GetEntryTypeFunc m_get_entry_type; + std::thread m_read_thread; + std::thread m_write_thread; + std::atomic_bool m_active; + std::atomic_uint m_proto_rev; +}; + +} // namespace ntimpl + +#endif // NT_NETWORKCONNECTION_H_ diff --git a/src/raw_socket_istream.cpp b/src/raw_socket_istream.cpp new file mode 100644 index 0000000000..81ca089951 --- /dev/null +++ b/src/raw_socket_istream.cpp @@ -0,0 +1,28 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#include "raw_socket_istream.h" + +using namespace ntimpl; + +raw_socket_istream::~raw_socket_istream() {} + +bool raw_socket_istream::read(void* data, std::size_t len) { + char* cdata = static_cast(data); + std::size_t pos = 0; + + while (pos < len) { + TCPStream::Error err; + std::size_t count = + m_stream.receive(&cdata[pos], len - pos, &err, m_timeout); + if (count == 0) return false; + pos += count; + } + return true; +} + +void raw_socket_istream::close() { m_stream.close(); } diff --git a/src/raw_socket_istream.h b/src/raw_socket_istream.h new file mode 100644 index 0000000000..ecc8287796 --- /dev/null +++ b/src/raw_socket_istream.h @@ -0,0 +1,32 @@ +/*----------------------------------------------------------------------------*/ +/* Copyright (c) FIRST 2015. All Rights Reserved. */ +/* Open Source Software - may be modified and shared by FRC teams. The code */ +/* must be accompanied by the FIRST BSD license file in the root directory of */ +/* the project. */ +/*----------------------------------------------------------------------------*/ + +#ifndef NT_RAW_SOCKET_ISTREAM_H_ +#define NT_RAW_SOCKET_ISTREAM_H_ + +#include "raw_istream.h" + +#include "tcpsockets/TCPStream.h" + +namespace ntimpl { + +class raw_socket_istream : public raw_istream { + public: + raw_socket_istream(TCPStream& stream, int timeout = 0) + : m_stream(stream), m_timeout(timeout) {} + virtual ~raw_socket_istream(); + virtual bool read(void* data, std::size_t len); + virtual void close(); + + private: + TCPStream& m_stream; + int m_timeout; +}; + +} // namespace ntimpl + +#endif // NT_RAW_SOCKET_ISTREAM_H_ diff --git a/src/tcpsockets/TCPAcceptor.cpp b/src/tcpsockets/TCPAcceptor.cpp new file mode 100644 index 0000000000..a63f5a2f62 --- /dev/null +++ b/src/tcpsockets/TCPAcceptor.cpp @@ -0,0 +1,86 @@ +/* + TCPAcceptor.cpp + + TCPAcceptor class definition. TCPAcceptor provides methods to passively + establish TCP/IP connections with clients. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "TCPAcceptor.h" + +#include +#include +#include +#include +#include + +TCPAcceptor::TCPAcceptor(int port, const char* address) + : m_lsd(0), m_port(port), m_address(address), m_listening(false) {} + +TCPAcceptor::~TCPAcceptor() { + if (m_lsd > 0) { + close(m_lsd); + } +} + +int TCPAcceptor::start() { + if (m_listening) return 0; + + m_lsd = socket(PF_INET, SOCK_STREAM, 0); + struct sockaddr_in address; + + std::memset(&address, 0, sizeof(address)); + address.sin_family = PF_INET; + address.sin_port = htons(m_port); + if (m_address.size() > 0) { + inet_pton(PF_INET, m_address.c_str(), &(address.sin_addr)); + } else { + address.sin_addr.s_addr = INADDR_ANY; + } + + int optval = 1; + setsockopt(m_lsd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); + + int result = bind(m_lsd, (struct sockaddr*)&address, sizeof(address)); + if (result != 0) { + perror("bind() failed"); + return result; + } + + result = listen(m_lsd, 5); + if (result != 0) { + perror("listen() failed"); + return result; + } + m_listening = true; + return result; +} + +std::unique_ptr TCPAcceptor::accept() { + if (!m_listening) return nullptr; + + struct sockaddr_in address; + socklen_t len = sizeof(address); + std::memset(&address, 0, sizeof(address)); + int sd = ::accept(m_lsd, (struct sockaddr*)&address, &len); + if (sd < 0) { + perror("accept() failed"); + return nullptr; + } + return std::unique_ptr(new TCPStream(sd, &address)); +} diff --git a/src/tcpsockets/TCPAcceptor.h b/src/tcpsockets/TCPAcceptor.h new file mode 100644 index 0000000000..c0fddf67f8 --- /dev/null +++ b/src/tcpsockets/TCPAcceptor.h @@ -0,0 +1,49 @@ +/* + TCPAcceptor.h + + TCPAcceptor class interface. TCPAcceptor provides methods to passively + establish TCP/IP connections with clients. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef TCPSOCKETS_TCPACCEPTOR_H_ +#define TCPSOCKETS_TCPACCEPTOR_H_ + +#include +#include + +#include "TCPStream.h" + +class TCPAcceptor { + int m_lsd; + int m_port; + std::string m_address; + bool m_listening; + + public: + TCPAcceptor(int port, const char* address); + ~TCPAcceptor(); + + int start(); + std::unique_ptr accept(); + + private: + TCPAcceptor() {} +}; + +#endif diff --git a/src/tcpsockets/TCPConnector.cpp b/src/tcpsockets/TCPConnector.cpp new file mode 100644 index 0000000000..aebe2176a7 --- /dev/null +++ b/src/tcpsockets/TCPConnector.cpp @@ -0,0 +1,120 @@ +/* + TCPConnector.h + + TCPConnector class definition. TCPConnector provides methods to actively + establish TCP/IP connections with a server. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License +*/ + +#include "TCPConnector.h" + +#include +#include +#include +#include +#include +#include +#include + +static int ResolveHostName(const char* hostname, struct in_addr* addr) { + struct addrinfo* res; + + int result = getaddrinfo(hostname, nullptr, nullptr, &res); + if (result == 0) { + std::memcpy(addr, &((struct sockaddr_in*)res->ai_addr)->sin_addr, + sizeof(struct in_addr)); + freeaddrinfo(res); + } + return result; +} + +std::unique_ptr TCPConnector::connect(const char* server, int port) { + struct sockaddr_in address; + + std::memset(&address, 0, sizeof(address)); + address.sin_family = AF_INET; + address.sin_port = htons(port); + if (ResolveHostName(server, &(address.sin_addr)) != 0) { + inet_pton(PF_INET, server, &(address.sin_addr)); + } + int sd = socket(AF_INET, SOCK_STREAM, 0); + if (::connect(sd, (struct sockaddr*)&address, sizeof(address)) != 0) { + perror("connect() failed"); + return nullptr; + } + return std::unique_ptr(new TCPStream(sd, &address)); +} + +std::unique_ptr TCPConnector::connect(const char* server, int port, + int timeout) { + if (timeout == 0) return connect(server, port); + + struct sockaddr_in address; + + std::memset(&address, 0, sizeof(address)); + address.sin_family = AF_INET; + address.sin_port = htons(port); + if (ResolveHostName(server, &(address.sin_addr)) != 0) { + inet_pton(PF_INET, server, &(address.sin_addr)); + } + + long arg; + fd_set sdset; + struct timeval tv; + socklen_t len; + int result = -1, valopt, sd = socket(AF_INET, SOCK_STREAM, 0); + + // Set socket to non-blocking + arg = fcntl(sd, F_GETFL, nullptr); + arg |= O_NONBLOCK; + fcntl(sd, F_SETFL, arg); + + // Connect with time limit + std::string message; + if ((result = ::connect(sd, (struct sockaddr*)&address, sizeof(address))) < + 0) { + if (errno == EINPROGRESS) { + tv.tv_sec = timeout; + tv.tv_usec = 0; + FD_ZERO(&sdset); + FD_SET(sd, &sdset); + if (select(sd + 1, nullptr, &sdset, nullptr, &tv) > 0) { + len = sizeof(int); + getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)(&valopt), &len); + if (valopt) { + fprintf(stderr, "connect() error %d - %s\n", valopt, + strerror(valopt)); + } + // connection established + else + result = 0; + } else + fprintf(stderr, "connect() timed out\n"); + } else + fprintf(stderr, "connect() error %d - %s\n", errno, strerror(errno)); + } + + // Return socket to blocking mode + arg = fcntl(sd, F_GETFL, nullptr); + arg &= (~O_NONBLOCK); + fcntl(sd, F_SETFL, arg); + + // Create stream object if connected + if (result == -1) return nullptr; + return std::unique_ptr(new TCPStream(sd, &address)); +} diff --git a/src/tcpsockets/TCPConnector.h b/src/tcpsockets/TCPConnector.h new file mode 100644 index 0000000000..8c32b79754 --- /dev/null +++ b/src/tcpsockets/TCPConnector.h @@ -0,0 +1,38 @@ +/* + TCPConnector.h + + TCPConnector class interface. TCPConnector provides methods to actively + establish TCP/IP connections with a server. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License +*/ + +#ifndef TCPSOCKETS_TCPCONNECTOR_H_ +#define TCPSOCKETS_TCPCONNECTOR_H_ + +#include + +#include "TCPStream.h" + +class TCPConnector { + public: + static std::unique_ptr connect(const char* server, int port); + static std::unique_ptr connect(const char* server, int port, + int timeout); +}; + +#endif diff --git a/src/tcpsockets/TCPStream.cpp b/src/tcpsockets/TCPStream.cpp new file mode 100644 index 0000000000..a793a2ee0f --- /dev/null +++ b/src/tcpsockets/TCPStream.cpp @@ -0,0 +1,92 @@ +/* + TCPStream.h + + TCPStream class definition. TCPStream provides methods to trasnfer + data between peers over a TCP/IP connection. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "TCPStream.h" + +#include +#include + +TCPStream::TCPStream(int sd, struct sockaddr_in* address) : m_sd(sd) { + char ip[50]; + inet_ntop(PF_INET, (struct in_addr*)&(address->sin_addr.s_addr), ip, + sizeof(ip) - 1); + m_peerIP = ip; + m_peerPort = ntohs(address->sin_port); +} + +TCPStream::~TCPStream() { close(); } + +std::size_t TCPStream::send(const char* buffer, std::size_t len, Error* err) { + if (m_sd < 0) { + *err = kConnectionClosed; + return 0; + } + ssize_t rv = write(m_sd, buffer, len); + if (rv < 0) { + *err = kConnectionReset; + return 0; + } + return static_cast(rv); +} + +std::size_t TCPStream::receive(char* buffer, std::size_t len, Error* err, + int timeout) { + if (m_sd < 0) { + *err = kConnectionClosed; + return 0; + } + ssize_t rv; + if (timeout <= 0) + rv = read(m_sd, buffer, len); + else if (WaitForReadEvent(timeout)) + rv = read(m_sd, buffer, len); + else { + *err = kConnectionTimedOut; + return 0; + } + if (rv < 0) { + *err = kConnectionReset; + return 0; + } + return static_cast(rv); +} + +void TCPStream::close() { + if (m_sd >= 0) + ::close(m_sd); + m_sd = -1; +} + +bool TCPStream::WaitForReadEvent(int timeout) { + fd_set sdset; + struct timeval tv; + + tv.tv_sec = timeout; + tv.tv_usec = 0; + FD_ZERO(&sdset); + FD_SET(m_sd, &sdset); + if (select(m_sd + 1, &sdset, NULL, NULL, &tv) > 0) { + return true; + } + return false; +} diff --git a/src/tcpsockets/TCPStream.h b/src/tcpsockets/TCPStream.h new file mode 100644 index 0000000000..32b3b7cbf6 --- /dev/null +++ b/src/tcpsockets/TCPStream.h @@ -0,0 +1,72 @@ +/* + TCPStream.h + + TCPStream class interface. TCPStream provides methods to trasnfer + data between peers over a TCP/IP connection. + + ------------------------------------------ + + Copyright © 2013 [Vic Hargrave - http://vichargrave.com] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef TCPSOCKETS_TCPSTREAM_H_ +#define TCPSOCKETS_TCPSTREAM_H_ + +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +#include "llvm/StringRef.h" + +class TCPStream { + int m_sd; + std::string m_peerIP; + int m_peerPort; + + public: + friend class TCPAcceptor; + friend class TCPConnector; + + ~TCPStream(); + + enum Error { + kConnectionClosed = 0, + kConnectionReset = -1, + kConnectionTimedOut = -2 + }; + + std::size_t send(const char* buffer, std::size_t len, Error* err); + std::size_t receive(char* buffer, std::size_t len, Error* err, + int timeout = 0); + void close(); + + llvm::StringRef getPeerIP() const { return m_peerIP; } + int getPeerPort() const { return m_peerPort; } + + TCPStream(const TCPStream& stream) = delete; + TCPStream& operator=(const TCPStream&) = delete; + private: + bool WaitForReadEvent(int timeout); + + TCPStream(int sd, struct sockaddr_in* address); + TCPStream() = delete; +}; + +#endif