Implement remote procedure calls.

This commit is contained in:
Peter Johnson
2015-08-13 13:12:15 -07:00
parent e199e3571b
commit 8d7cdeabbf
13 changed files with 892 additions and 261 deletions

View File

@@ -20,9 +20,15 @@ using namespace nt;
ATOMIC_STATIC_INIT(Storage)
Storage::Storage(Notifier& notifier) : m_notifier(notifier) {}
Storage::Storage(Notifier& notifier, RpcServer& rpc_server)
: m_notifier(notifier), m_rpc_server(rpc_server) {
m_terminating = false;
}
Storage::~Storage() {}
Storage::~Storage() {
m_terminating = true;
m_rpc_results_cond.notify_all();
}
void Storage::SetOutgoing(QueueOutgoingFunc queue_outgoing, bool server) {
std::lock_guard<std::mutex> lock(m_mutex);
@@ -43,7 +49,8 @@ NT_Type Storage::GetEntryType(unsigned int id) const {
}
void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
NetworkConnection* conn) {
NetworkConnection* conn,
std::weak_ptr<NetworkConnection> conn_weak) {
std::unique_lock<std::mutex> lock(m_mutex);
switch (msg->type()) {
case Message::kKeepAlive:
@@ -267,10 +274,36 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
break;
}
case Message::kExecuteRpc:
case Message::kRpcResponse:
// TODO
case Message::kExecuteRpc: {
if (!m_server) return; // only process on server
unsigned int id = msg->id();
if (id >= m_idmap.size() || !m_idmap[id]) {
// ignore call to non-existent RPC
// this can happen due to deleted entries
lock.unlock();
DEBUG("received RPC call to unknown entry");
return;
}
Entry* entry = m_idmap[id];
if (!entry->value->IsRpc()) {
lock.unlock();
DEBUG("received RPC call to non-RPC entry");
return;
}
m_rpc_server.ProcessRpc(entry->name, msg, entry->rpc_callback,
conn->uid(), [=](std::shared_ptr<Message> msg) {
auto c = conn_weak.lock();
if (c) c->QueueOutgoing(msg);
});
break;
}
case Message::kRpcResponse: {
if (m_server) return; // only process on client
m_rpc_results.insert(std::make_pair(
std::make_pair(msg->id(), msg->seq_num_uid()), msg->str()));
m_rpc_results_cond.notify_all();
break;
}
default:
break;
}
@@ -478,10 +511,11 @@ void Storage::DeleteEntry(StringRef name) {
if (i == m_entries.end()) return;
Entry* entry = i->getValue().get();
unsigned int id = entry->id;
bool had_value = entry->value != nullptr;
m_entries.erase(i); // erase from map
if (id < m_idmap.size()) m_idmap[id] = nullptr;
if (!entry->value) return;
if (!had_value) return;
// if it had a value, generate message
// don't send an update if we don't have an assigned id yet
@@ -1003,3 +1037,136 @@ next_line:
return true;
}
void Storage::CreateRpc(StringRef name, StringRef def, RpcCallback callback) {
if (name.empty() || def.empty() || !callback) return;
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_server) return; // only server can create RPCs
auto& new_entry = m_entries[name];
if (!new_entry) new_entry.reset(new Entry(name));
Entry* entry = new_entry.get();
auto old_value = entry->value;
auto value = Value::MakeRpc(def);
entry->value = value;
// set up the new callback
entry->rpc_callback = callback;
// start the RPC server
if (!m_rpc_server.active()) m_rpc_server.Start();
if (old_value && *old_value == *value) return;
// assign an id if it doesn't have one
if (entry->id == 0xffff) {
unsigned int id = m_idmap.size();
entry->id = id;
m_idmap.push_back(entry);
}
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
if (!old_value || old_value->type() != value->type()) {
++entry->seq_num;
auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(),
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else {
++entry->seq_num;
auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
}
void Storage::CreatePolledRpc(StringRef name, StringRef def) {
if (name.empty() || def.empty()) return;
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_server) return; // only server can create RPCs
auto& new_entry = m_entries[name];
if (!new_entry) new_entry.reset(new Entry(name));
Entry* entry = new_entry.get();
auto old_value = entry->value;
auto value = Value::MakeRpc(def);
entry->value = value;
// a nullptr callback indicates a polled RPC
entry->rpc_callback = nullptr;
if (old_value && *old_value == *value) return;
// assign an id if it doesn't have one
if (entry->id == 0xffff) {
unsigned int id = m_idmap.size();
entry->id = id;
m_idmap.push_back(entry);
}
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
if (!old_value || old_value->type() != value->type()) {
++entry->seq_num;
auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(),
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else {
++entry->seq_num;
auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
}
unsigned int Storage::CallRpc(StringRef name, StringRef params) {
std::unique_lock<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end()) return 0;
auto& entry = i->getValue();
if (!entry->value->IsRpc()) return 0;
++entry->rpc_call_uid;
if (entry->rpc_call_uid > 0xffff) entry->rpc_call_uid = 0;
unsigned int combined_uid = (entry->id << 16) | entry->rpc_call_uid;
auto msg = Message::ExecuteRpc(entry->id, entry->rpc_call_uid, params);
if (m_server) {
// RPCs are unlikely to be used locally on the server, but handle it
// gracefully anyway.
auto rpc_callback = entry->rpc_callback;
lock.unlock();
m_rpc_server.ProcessRpc(
name, msg, rpc_callback, 0xffffU, [this](std::shared_ptr<Message> msg) {
std::lock_guard<std::mutex> lock(m_mutex);
m_rpc_results.insert(std::make_pair(
std::make_pair(msg->id(), msg->seq_num_uid()), msg->str()));
m_rpc_results_cond.notify_all();
});
} else {
auto queue_outgoing = m_queue_outgoing;
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
return combined_uid;
}
bool Storage::GetRpcResult(bool blocking, unsigned int call_uid,
std::string* result) {
std::unique_lock<std::mutex> lock(m_mutex);
for (;;) {
auto i =
m_rpc_results.find(std::make_pair(call_uid >> 16, call_uid & 0xffff));
if (i == m_rpc_results.end()) {
if (!blocking || m_terminating) return false;
m_rpc_results_cond.wait(lock);
continue;
}
result->swap(i->getSecond());
m_rpc_results.erase(i);
return true;
}
}