Add INetworkConnection interface for unit testing.

This commit is contained in:
Peter Johnson
2017-09-04 22:01:59 -07:00
parent 7c1d2f4bc4
commit cedbafeb28
12 changed files with 143 additions and 59 deletions

View File

@@ -16,6 +16,7 @@
#include "IConnectionNotifier.h"
#include "Log.h"
#include "IStorage.h"
#include "NetworkConnection.h"
using namespace nt;
@@ -178,7 +179,7 @@ void DispatcherBase::Stop() {
if (m_dispatch_thread.joinable()) m_dispatch_thread.join();
if (m_clientserver_thread.joinable()) m_clientserver_thread.join();
std::vector<std::shared_ptr<NetworkConnection>> conns;
std::vector<std::shared_ptr<INetworkConnection>> conns;
{
std::lock_guard<std::mutex> lock(m_user_mutex);
conns.swap(m_connections);
@@ -315,8 +316,8 @@ void DispatcherBase::DispatchThreadMain() {
}
void DispatcherBase::QueueOutgoing(std::shared_ptr<Message> msg,
NetworkConnection* only,
NetworkConnection* except) {
INetworkConnection* only,
INetworkConnection* except) {
std::lock_guard<std::mutex> user_lock(m_user_mutex);
for (auto& conn : m_connections) {
if (conn.get() == except) continue;

View File

@@ -15,12 +15,13 @@
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "llvm/StringRef.h"
#include "IDispatcher.h"
#include "NetworkConnection.h"
#include "INetworkConnection.h"
namespace wpi {
class Logger;
@@ -32,6 +33,7 @@ namespace nt {
class IConnectionNotifier;
class IStorage;
class NetworkConnection;
class DispatcherBase : public IDispatcher {
friend class DispatcherTest;
@@ -79,8 +81,8 @@ class DispatcherBase : public IDispatcher {
void ClientReconnect(unsigned int proto_rev = 0x0300);
void QueueOutgoing(std::shared_ptr<Message> msg, NetworkConnection* only,
NetworkConnection* except) override;
void QueueOutgoing(std::shared_ptr<Message> msg, INetworkConnection* only,
INetworkConnection* except) override;
IStorage& m_storage;
IConnectionNotifier& m_notifier;
@@ -96,7 +98,7 @@ class DispatcherBase : public IDispatcher {
// Mutex for user-accessible items
mutable std::mutex m_user_mutex;
std::vector<std::shared_ptr<NetworkConnection>> m_connections;
std::vector<std::shared_ptr<INetworkConnection>> m_connections;
std::string m_identity;
std::atomic_bool m_active; // set to false to terminate threads

View File

@@ -14,7 +14,7 @@
namespace nt {
class NetworkConnection;
class INetworkConnection;
// Interface for generation of outgoing messages to break a dependency loop
// between Storage and Dispatcher.
@@ -25,8 +25,8 @@ class IDispatcher {
IDispatcher& operator=(const IDispatcher&) = delete;
virtual ~IDispatcher() = default;
virtual void QueueOutgoing(std::shared_ptr<Message> msg,
NetworkConnection* only,
NetworkConnection* except) = 0;
INetworkConnection* only,
INetworkConnection* except) = 0;
};
} // namespace nt

View File

@@ -0,0 +1,41 @@
/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2017. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef NT_INETWORKCONNECTION_H_
#define NT_INETWORKCONNECTION_H_
#include <memory>
#include "Message.h"
#include "ntcore_cpp.h"
namespace nt {
class INetworkConnection {
public:
enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead };
INetworkConnection() = default;
INetworkConnection(const INetworkConnection&) = delete;
INetworkConnection& operator=(const INetworkConnection&) = delete;
virtual ~INetworkConnection() = default;
virtual ConnectionInfo info() const = 0;
virtual void QueueOutgoing(std::shared_ptr<Message> msg) = 0;
virtual void PostOutgoing(bool keep_alive) = 0;
virtual unsigned int proto_rev() const = 0;
virtual void set_proto_rev(unsigned int proto_rev) = 0;
virtual State state() const = 0;
virtual void set_state(State state) = 0;
};
} // namespace nt
#endif // NT_INETWORKCONNECTION_H_

View File

@@ -21,7 +21,7 @@
namespace nt {
class IDispatcher;
class NetworkConnection;
class INetworkConnection;
class IStorage {
public:
@@ -42,12 +42,13 @@ class IStorage {
virtual NT_Type GetMessageEntryType(unsigned int id) const = 0;
virtual void ProcessIncoming(std::shared_ptr<Message> msg,
NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) = 0;
INetworkConnection* conn,
std::weak_ptr<INetworkConnection> conn_weak) = 0;
virtual void GetInitialAssignments(
NetworkConnection& conn, std::vector<std::shared_ptr<Message>>* msgs) = 0;
INetworkConnection& conn,
std::vector<std::shared_ptr<Message>>* msgs) = 0;
virtual void ApplyInitialAssignments(
NetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
INetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
bool new_server, std::vector<std::shared_ptr<Message>>* out_msgs) = 0;
// Filename-based save/load functions. Used both by periodic saves and

View File

@@ -97,6 +97,12 @@ ConnectionInfo NetworkConnection::info() const {
m_last_update, m_proto_rev};
}
unsigned int NetworkConnection::proto_rev() const { return m_proto_rev; }
void NetworkConnection::set_proto_rev(unsigned int proto_rev) {
m_proto_rev = proto_rev;
}
NetworkConnection::State NetworkConnection::state() const {
std::lock_guard<std::mutex> lock(m_state_mutex);
return m_state;

View File

@@ -14,6 +14,7 @@
#include <thread>
#include "support/ConcurrentQueue.h"
#include "INetworkConnection.h"
#include "Message.h"
#include "ntcore_cpp.h"
@@ -26,10 +27,8 @@ namespace nt {
class IConnectionNotifier;
class NetworkConnection {
class NetworkConnection : public INetworkConnection {
public:
enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead };
typedef std::function<bool(
NetworkConnection& conn,
std::function<std::shared_ptr<Message>()> get_msg,
@@ -56,21 +55,21 @@ class NetworkConnection {
void Start();
void Stop();
ConnectionInfo info() const;
ConnectionInfo info() const override;
bool active() const { return m_active; }
wpi::NetworkStream& stream() { return *m_stream; }
void QueueOutgoing(std::shared_ptr<Message> msg);
void PostOutgoing(bool keep_alive);
void QueueOutgoing(std::shared_ptr<Message> msg) override;
void PostOutgoing(bool keep_alive) override;
unsigned int uid() const { return m_uid; }
unsigned int proto_rev() const { return m_proto_rev; }
void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; }
unsigned int proto_rev() const override;
void set_proto_rev(unsigned int proto_rev) override;
State state() const;
void set_state(State state);
State state() const override;
void set_state(State state) override;
std::string remote_id() const;
void set_remote_id(StringRef remote_id);

View File

@@ -12,9 +12,9 @@
#include "Handle.h"
#include "IDispatcher.h"
#include "IEntryNotifier.h"
#include "INetworkConnection.h"
#include "IRpcServer.h"
#include "Log.h"
#include "NetworkConnection.h"
using namespace nt;
@@ -46,8 +46,8 @@ NT_Type Storage::GetMessageEntryType(unsigned int id) const {
}
void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) {
INetworkConnection* conn,
std::weak_ptr<INetworkConnection> conn_weak) {
switch (msg->type()) {
case Message::kKeepAlive:
break; // ignore
@@ -85,7 +85,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
StringRef name = msg->str();
@@ -209,7 +209,7 @@ void Storage::ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
@@ -246,7 +246,7 @@ void Storage::ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
@@ -270,7 +270,7 @@ void Storage::ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
@@ -294,7 +294,7 @@ void Storage::ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
// update local
DeleteAllEntriesImpl(false);
@@ -309,8 +309,8 @@ void Storage::ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
}
void Storage::ProcessIncomingExecuteRpc(
std::shared_ptr<Message> msg, NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) {
std::shared_ptr<Message> msg, INetworkConnection* conn,
std::weak_ptr<INetworkConnection> conn_weak) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_server) return; // only process on server
unsigned int id = msg->id();
@@ -349,7 +349,7 @@ void Storage::ProcessIncomingExecuteRpc(
}
void Storage::ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
INetworkConnection* conn) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_server) return; // only process on client
unsigned int id = msg->id();
@@ -372,9 +372,9 @@ void Storage::ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
}
void Storage::GetInitialAssignments(
NetworkConnection& conn, std::vector<std::shared_ptr<Message>>* msgs) {
INetworkConnection& conn, std::vector<std::shared_ptr<Message>>* msgs) {
std::lock_guard<std::mutex> lock(m_mutex);
conn.set_state(NetworkConnection::kSynchronized);
conn.set_state(INetworkConnection::kSynchronized);
for (auto& i : m_entries) {
Entry* entry = i.getValue();
msgs->emplace_back(Message::EntryAssign(i.getKey(), entry->id,
@@ -384,12 +384,12 @@ void Storage::GetInitialAssignments(
}
void Storage::ApplyInitialAssignments(
NetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
INetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
bool new_server, std::vector<std::shared_ptr<Message>>* out_msgs) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_server) return; // should not do this on server
conn.set_state(NetworkConnection::kSynchronized);
conn.set_state(INetworkConnection::kSynchronized);
std::vector<std::shared_ptr<Message>> update_msgs;

View File

@@ -36,6 +36,7 @@ class raw_istream;
namespace nt {
class IEntryNotifier;
class INetworkConnection;
class IRpcServer;
class IStorageTest;
@@ -60,13 +61,13 @@ class Storage : public IStorage {
// message itself). Not used in wire protocol 3.0.
NT_Type GetMessageEntryType(unsigned int id) const override;
void ProcessIncoming(std::shared_ptr<Message> msg, NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) override;
void ProcessIncoming(std::shared_ptr<Message> msg, INetworkConnection* conn,
std::weak_ptr<INetworkConnection> conn_weak) override;
void GetInitialAssignments(
NetworkConnection& conn,
INetworkConnection& conn,
std::vector<std::shared_ptr<Message>>* msgs) override;
void ApplyInitialAssignments(
NetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
INetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
bool new_server,
std::vector<std::shared_ptr<Message>>* out_msgs) override;
@@ -197,20 +198,20 @@ class Storage : public IStorage {
wpi::Logger& m_logger;
void ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
void ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
void ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
void ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
void ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
void ProcessIncomingExecuteRpc(std::shared_ptr<Message> msg,
NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak);
INetworkConnection* conn,
std::weak_ptr<INetworkConnection> conn_weak);
void ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
NetworkConnection* conn);
INetworkConnection* conn);
bool GetPersistentEntries(
bool periodic,