Add NetworkAcceptor and NetworkStream interfaces.

These allow mocking of TCPAcceptor and TCPStream respectively.
This commit is contained in:
Peter Johnson
2015-08-02 00:06:27 -07:00
parent 6703968f73
commit 0dcaf56ed1
10 changed files with 93 additions and 33 deletions

View File

@@ -7,7 +7,7 @@
#include "NetworkConnection.h"
#include "tcpsockets/TCPStream.h"
#include "tcpsockets/NetworkStream.h"
#include "Log.h"
#include "raw_socket_istream.h"
#include "WireDecoder.h"
@@ -15,7 +15,7 @@
using namespace nt;
NetworkConnection::NetworkConnection(std::unique_ptr<TCPStream> stream,
NetworkConnection::NetworkConnection(std::unique_ptr<NetworkStream> stream,
HandshakeFunc handshake,
Message::GetEntryTypeFunc get_entry_type,
ProcessIncomingFunc process_incoming)
@@ -119,7 +119,7 @@ void NetworkConnection::WriteThreadMain() {
for (auto& msg : msgs) {
if (msg) msg->Write(encoder);
}
TCPStream::Error err;
NetworkStream::Error err;
if (!m_stream) break;
if (encoder.size() == 0) continue;
if (m_stream->send(encoder.data(), encoder.size(), &err) == 0) break;

View File

@@ -15,7 +15,7 @@
#include "support/ConcurrentQueue.h"
#include "Message.h"
class TCPStream;
class NetworkStream;
namespace nt {
@@ -33,7 +33,7 @@ class NetworkConnection {
typedef std::vector<std::shared_ptr<Message>> Outgoing;
typedef ConcurrentQueue<Outgoing> OutgoingQueue;
NetworkConnection(std::unique_ptr<TCPStream> stream,
NetworkConnection(std::unique_ptr<NetworkStream> stream,
HandshakeFunc handshake,
Message::GetEntryTypeFunc get_entry_type,
ProcessIncomingFunc process_incoming);
@@ -43,7 +43,7 @@ class NetworkConnection {
void Stop();
bool active() const { return m_active; }
TCPStream& stream() { return *m_stream; }
NetworkStream& stream() { return *m_stream; }
OutgoingQueue& outgoing() { return m_outgoing; }
unsigned int proto_rev() const { return m_proto_rev; }
@@ -62,7 +62,7 @@ class NetworkConnection {
void ReadThreadMain();
void WriteThreadMain();
std::unique_ptr<TCPStream> m_stream;
std::unique_ptr<NetworkStream> m_stream;
OutgoingQueue m_outgoing;
HandshakeFunc m_handshake;
Message::GetEntryTypeFunc m_get_entry_type;

View File

@@ -16,7 +16,7 @@ bool raw_socket_istream::read(void* data, std::size_t len) {
std::size_t pos = 0;
while (pos < len) {
TCPStream::Error err;
NetworkStream::Error err;
std::size_t count =
m_stream.receive(&cdata[pos], len - pos, &err, m_timeout);
if (count == 0) return false;

View File

@@ -10,20 +10,20 @@
#include "raw_istream.h"
#include "tcpsockets/TCPStream.h"
#include "tcpsockets/NetworkStream.h"
namespace nt {
class raw_socket_istream : public raw_istream {
public:
raw_socket_istream(TCPStream& stream, int timeout = 0)
raw_socket_istream(NetworkStream& stream, int timeout = 0)
: m_stream(stream), m_timeout(timeout) {}
virtual ~raw_socket_istream();
virtual bool read(void* data, std::size_t len);
virtual void close();
private:
TCPStream& m_stream;
NetworkStream& m_stream;
int m_timeout;
};

View File

@@ -0,0 +1,26 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef TCPSOCKETS_NETWORKACCEPTOR_H_
#define TCPSOCKETS_NETWORKACCEPTOR_H_
#include "NetworkStream.h"
class NetworkAcceptor {
public:
NetworkAcceptor() = default;
virtual ~NetworkAcceptor() = default;
virtual int start() = 0;
virtual void shutdown() = 0;
virtual std::unique_ptr<NetworkStream> accept() = 0;
NetworkAcceptor(const NetworkAcceptor&) = delete;
NetworkAcceptor& operator=(const NetworkAcceptor&) = delete;
};
#endif // TCPSOCKETS_NETWORKACCEPTOR_H_

View File

@@ -0,0 +1,38 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef TCPSOCKETS_NETWORKSTREAM_H_
#define TCPSOCKETS_NETWORKSTREAM_H_
#include <cstddef>
#include "llvm/StringRef.h"
class NetworkStream {
public:
NetworkStream() = default;
virtual ~NetworkStream() = default;
enum Error {
kConnectionClosed = 0,
kConnectionReset = -1,
kConnectionTimedOut = -2
};
virtual std::size_t send(const char* buffer, std::size_t len, Error* err) = 0;
virtual std::size_t receive(char* buffer, std::size_t len, Error* err,
int timeout = 0) = 0;
virtual void close() = 0;
virtual llvm::StringRef getPeerIP() const = 0;
virtual int getPeerPort() const = 0;
NetworkStream(const NetworkStream&) = delete;
NetworkStream& operator=(const NetworkStream&) = delete;
};
#endif // TCPSOCKETS_NETWORKSTREAM_H_

View File

@@ -83,7 +83,7 @@ void TCPAcceptor::shutdown() {
::shutdown(m_lsd, SHUT_RDWR);
}
std::unique_ptr<TCPStream> TCPAcceptor::accept() {
std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
if (!m_listening) return nullptr;
struct sockaddr_in address;
@@ -94,5 +94,5 @@ std::unique_ptr<TCPStream> TCPAcceptor::accept() {
if (!m_shutdown) ERROR("accept() failed: " << strerror(errno));
return nullptr;
}
return std::unique_ptr<TCPStream>(new TCPStream(sd, &address));
return std::unique_ptr<NetworkStream>(new TCPStream(sd, &address));
}

View File

@@ -27,9 +27,10 @@
#include <memory>
#include <string>
#include "NetworkAcceptor.h"
#include "TCPStream.h"
class TCPAcceptor {
class TCPAcceptor : public NetworkAcceptor {
int m_lsd;
int m_port;
std::string m_address;
@@ -40,12 +41,9 @@ class TCPAcceptor {
TCPAcceptor(int port, const char* address);
~TCPAcceptor();
int start();
void shutdown();
std::unique_ptr<TCPStream> accept();
private:
TCPAcceptor() {}
int start() override;
void shutdown() override;
std::unique_ptr<NetworkStream> accept() override;
};
#endif

View File

@@ -79,6 +79,10 @@ void TCPStream::close() {
m_sd = -1;
}
llvm::StringRef TCPStream::getPeerIP() const { return m_peerIP; }
int TCPStream::getPeerPort() const { return m_peerPort; }
bool TCPStream::WaitForReadEvent(int timeout) {
fd_set sdset;
struct timeval tv;

View File

@@ -33,9 +33,9 @@
#include <sys/socket.h>
#endif
#include "llvm/StringRef.h"
#include "NetworkStream.h"
class TCPStream {
class TCPStream : public NetworkStream {
int m_sd;
std::string m_peerIP;
int m_peerPort;
@@ -46,19 +46,13 @@ class TCPStream {
~TCPStream();
enum Error {
kConnectionClosed = 0,
kConnectionReset = -1,
kConnectionTimedOut = -2
};
std::size_t send(const char* buffer, std::size_t len, Error* err);
std::size_t send(const char* buffer, std::size_t len, Error* err) override;
std::size_t receive(char* buffer, std::size_t len, Error* err,
int timeout = 0);
void close();
int timeout = 0) override;
void close() override;
llvm::StringRef getPeerIP() const { return m_peerIP; }
int getPeerPort() const { return m_peerPort; }
llvm::StringRef getPeerIP() const override;
int getPeerPort() const override;
TCPStream(const TCPStream& stream) = delete;
TCPStream& operator=(const TCPStream&) = delete;