From 7c1d2f4bc40129ffd6687977ee702cd9e2eaee86 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 13 Aug 2017 10:11:29 -0700 Subject: [PATCH] Improve client connection synchronization behavior. The original synchronization behavior was troublesome for two reasons: - It had unpredictable behavior for updated values - It brought back to life deleted values Instead of relying on the server to inform the client regarding reconnections, the client keeps track of what values have been modified by user code on the client. When the client connects to the server, the following occurs. For entries that have been modified by user code on the client: - If the entry is not persistent, the server value is overwritten with the client value - If the entry does not exist on the server, the client sends an assignment to the server to recreate it on the server For entries that have not been modified by user code on the client: - The client value is overwritten with the server value - If the entry does not exist on the server, the client deletes the entry Fixes #8. --- src/main/native/cpp/Storage.cpp | 50 ++++++++++++++++++++++----------- src/main/native/cpp/Storage.h | 6 ++++ 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/src/main/native/cpp/Storage.cpp b/src/main/native/cpp/Storage.cpp index 454ec33bad..34abfcbe6f 100644 --- a/src/main/native/cpp/Storage.cpp +++ b/src/main/native/cpp/Storage.cpp @@ -426,10 +426,10 @@ void Storage::ApplyInitialAssignments( m_notifier.NotifyEntry(entry->local_id, name, entry->value, NT_NOTIFY_NEW); } else { - // if reconnect and sequence number not higher than local, then we - // don't update the local value and instead send it back to the server - // as an update message - if (!new_server && seq_num <= entry->seq_num) { + // if we have written the value locally and the value is not persistent, + // then we don't update the local value and instead send it back to the + // server as an update message + if (entry->local_write && !entry->IsPersistent()) { update_msgs.emplace_back(Message::EntryUpdate( entry->id, entry->seq_num.value(), entry->value)); } else { @@ -453,14 +453,21 @@ void Storage::ApplyInitialAssignments( m_idmap[id] = entry; } - // generate assign messages for unassigned local entries - for (auto& i : m_entries) { - Entry* entry = i.getValue(); - if (entry->id != 0xffff) continue; - out_msgs->emplace_back(Message::EntryAssign(entry->name, entry->id, - entry->seq_num.value(), - entry->value, entry->flags)); - } + // delete or generate assign messages for unassigned local entries + DeleteAllEntriesImpl(false, [&](Entry* entry) -> bool { + // was assigned by the server, don't delete + if (entry->id != 0xffff) return false; + // if we have written the value locally, we send an assign message to the + // server instead of deleting + if (entry->local_write) { + out_msgs->emplace_back( + Message::EntryAssign(entry->name, entry->id, entry->seq_num.value(), + entry->value, entry->flags)); + return false; + } + // otherwise delete + return true; + }); auto dispatcher = m_dispatcher; lock.unlock(); for (auto& msg : update_msgs) @@ -561,6 +568,9 @@ void Storage::SetEntryValueImpl(Entry* entry, std::shared_ptr value, m_notifier.NotifyEntry(entry->local_id, entry->name, value, NT_NOTIFY_UPDATE | (local ? NT_NOTIFY_LOCAL : 0)); + // remember local changes + if (local) entry->local_write = true; + // generate message if (!m_dispatcher || (!local && !m_server)) return; auto dispatcher = m_dispatcher; @@ -678,10 +688,11 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it, if (it != m_entries.end()) m_entries.erase(it); if (id < m_idmap.size()) m_idmap[id] = nullptr; - // empty the value and reset id + // empty the value and reset id and local_write flag std::shared_ptr old_value; old_value.swap(entry->value); entry->id = 0xffff; + entry->local_write = false; // remove RPC if there was one if (entry->rpc_uid != UINT_MAX) { @@ -708,21 +719,22 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it, } } -void Storage::DeleteAllEntriesImpl(bool local) { +template +void Storage::DeleteAllEntriesImpl(bool local, F should_delete) { if (m_entries.empty()) return; - // only delete non-persistent values // can't erase without invalidating iterators, so build a new map EntriesMap entries; for (auto& i : m_entries) { Entry* entry = i.getValue(); - if (!entry->IsPersistent()) { + if (should_delete(entry)) { // notify it's being deleted m_notifier.NotifyEntry(entry->local_id, i.getKey(), entry->value, NT_NOTIFY_DELETE | (local ? NT_NOTIFY_LOCAL : 0)); // remove it from idmap if (entry->id < m_idmap.size()) m_idmap[entry->id] = nullptr; entry->id = 0xffff; + entry->local_write = false; entry->value.reset(); continue; } @@ -733,6 +745,12 @@ void Storage::DeleteAllEntriesImpl(bool local) { m_entries.swap(entries); } +void Storage::DeleteAllEntriesImpl(bool local) { + // only delete non-persistent values + DeleteAllEntriesImpl(local, + [](Entry* entry) { return !entry->IsPersistent(); }); +} + void Storage::DeleteAllEntries() { std::unique_lock lock(m_mutex); if (m_entries.empty()) return; diff --git a/src/main/native/cpp/Storage.h b/src/main/native/cpp/Storage.h index 1a5538f314..611ed2747a 100644 --- a/src/main/native/cpp/Storage.h +++ b/src/main/native/cpp/Storage.h @@ -156,6 +156,10 @@ class Storage : public IStorage { // Sequence number for update resolution. SequenceNumber seq_num; + // If value has been written locally. Used during initial handshake + // on client to determine whether or not to accept remote changes. + bool local_write{false}; + // RPC handle. unsigned int rpc_uid{UINT_MAX}; @@ -220,6 +224,8 @@ class Storage : public IStorage { std::unique_lock& lock, bool local); // Must be called with m_mutex held + template + void DeleteAllEntriesImpl(bool local, F should_delete); void DeleteAllEntriesImpl(bool local); Entry* GetOrNew(StringRef name, bool* is_new = nullptr); };