Support client round robin to multiple server addresses.

Change-Id: If87dc64a485b1c8a340c5f6fa39ca09d40133e30
This commit is contained in:
Peter Johnson
2016-04-08 13:31:35 -07:00
parent b8ad1de33c
commit 5ac68f74d4
12 changed files with 175 additions and 25 deletions

View File

@@ -33,6 +33,20 @@ void Dispatcher::StartClient(const char* server_name, unsigned int port) {
});
}
void Dispatcher::StartClient(
ArrayRef<std::pair<StringRef, unsigned int>> servers) {
std::vector<Connector> connectors;
for (const auto& server : servers) {
std::string server_name(server.first);
unsigned int port = server.second;
connectors.emplace_back([=]() -> std::unique_ptr<NetworkStream> {
return TCPConnector::connect(server_name.c_str(),
static_cast<int>(port), 1);
});
}
DispatcherBase::StartClient(std::move(connectors));
}
Dispatcher::Dispatcher()
: Dispatcher(Storage::GetInstance(), Notifier::GetInstance()) {}
@@ -80,22 +94,26 @@ void DispatcherBase::StartServer(StringRef persist_filename,
m_clientserver_thread = std::thread(&Dispatcher::ServerThreadMain, this);
}
void DispatcherBase::StartClient(
std::function<std::unique_ptr<NetworkStream>()> connect) {
void DispatcherBase::StartClient(Connector connector) {
std::vector<Connector> connectors;
connectors.push_back(connector);
StartClient(std::move(connectors));
}
void DispatcherBase::StartClient(std::vector<Connector>&& connectors) {
{
std::lock_guard<std::mutex> lock(m_user_mutex);
if (m_active) return;
m_active = true;
m_client_connectors = std::move(connectors);
}
m_server = false;
using namespace std::placeholders;
m_storage.SetOutgoing(std::bind(&Dispatcher::QueueOutgoing, this, _1, _2, _3),
m_server);
m_dispatch_thread = std::thread(&Dispatcher::DispatchThreadMain, this);
m_clientserver_thread =
std::thread(&Dispatcher::ClientThreadMain, this, connect);
m_clientserver_thread = std::thread(&Dispatcher::ClientThreadMain, this);
}
void DispatcherBase::Stop() {
@@ -105,6 +123,10 @@ void DispatcherBase::Stop() {
m_flush_cv.notify_one();
// wake up client thread with a reconnect
{
std::lock_guard<std::mutex> lock(m_user_mutex);
m_client_connectors.resize(0);
}
ClientReconnect();
// wake up server thread by shutting down the socket
@@ -287,11 +309,20 @@ void DispatcherBase::ServerThreadMain() {
}
}
void DispatcherBase::ClientThreadMain(
std::function<std::unique_ptr<NetworkStream>()> connect) {
void DispatcherBase::ClientThreadMain() {
std::size_t i = 0;
while (m_active) {
// sleep between retries
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::this_thread::sleep_for(std::chrono::milliseconds(250));
Connector connect;
// get next server to connect to
{
std::lock_guard<std::mutex> lock(m_user_mutex);
if (m_client_connectors.empty()) continue;
if (i >= m_client_connectors.size()) i = 0;
connect = m_client_connectors[i++];
}
// try to connect (with timeout)
DEBUG("client trying to connect");