diff --git a/src/main/native/cpp/Storage.cpp b/src/main/native/cpp/Storage.cpp index 454ec33bad..34abfcbe6f 100644 --- a/src/main/native/cpp/Storage.cpp +++ b/src/main/native/cpp/Storage.cpp @@ -426,10 +426,10 @@ void Storage::ApplyInitialAssignments( m_notifier.NotifyEntry(entry->local_id, name, entry->value, NT_NOTIFY_NEW); } else { - // if reconnect and sequence number not higher than local, then we - // don't update the local value and instead send it back to the server - // as an update message - if (!new_server && seq_num <= entry->seq_num) { + // if we have written the value locally and the value is not persistent, + // then we don't update the local value and instead send it back to the + // server as an update message + if (entry->local_write && !entry->IsPersistent()) { update_msgs.emplace_back(Message::EntryUpdate( entry->id, entry->seq_num.value(), entry->value)); } else { @@ -453,14 +453,21 @@ void Storage::ApplyInitialAssignments( m_idmap[id] = entry; } - // generate assign messages for unassigned local entries - for (auto& i : m_entries) { - Entry* entry = i.getValue(); - if (entry->id != 0xffff) continue; - out_msgs->emplace_back(Message::EntryAssign(entry->name, entry->id, - entry->seq_num.value(), - entry->value, entry->flags)); - } + // delete or generate assign messages for unassigned local entries + DeleteAllEntriesImpl(false, [&](Entry* entry) -> bool { + // was assigned by the server, don't delete + if (entry->id != 0xffff) return false; + // if we have written the value locally, we send an assign message to the + // server instead of deleting + if (entry->local_write) { + out_msgs->emplace_back( + Message::EntryAssign(entry->name, entry->id, entry->seq_num.value(), + entry->value, entry->flags)); + return false; + } + // otherwise delete + return true; + }); auto dispatcher = m_dispatcher; lock.unlock(); for (auto& msg : update_msgs) @@ -561,6 +568,9 @@ void Storage::SetEntryValueImpl(Entry* entry, std::shared_ptr value, m_notifier.NotifyEntry(entry->local_id, entry->name, value, NT_NOTIFY_UPDATE | (local ? NT_NOTIFY_LOCAL : 0)); + // remember local changes + if (local) entry->local_write = true; + // generate message if (!m_dispatcher || (!local && !m_server)) return; auto dispatcher = m_dispatcher; @@ -678,10 +688,11 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it, if (it != m_entries.end()) m_entries.erase(it); if (id < m_idmap.size()) m_idmap[id] = nullptr; - // empty the value and reset id + // empty the value and reset id and local_write flag std::shared_ptr old_value; old_value.swap(entry->value); entry->id = 0xffff; + entry->local_write = false; // remove RPC if there was one if (entry->rpc_uid != UINT_MAX) { @@ -708,21 +719,22 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it, } } -void Storage::DeleteAllEntriesImpl(bool local) { +template +void Storage::DeleteAllEntriesImpl(bool local, F should_delete) { if (m_entries.empty()) return; - // only delete non-persistent values // can't erase without invalidating iterators, so build a new map EntriesMap entries; for (auto& i : m_entries) { Entry* entry = i.getValue(); - if (!entry->IsPersistent()) { + if (should_delete(entry)) { // notify it's being deleted m_notifier.NotifyEntry(entry->local_id, i.getKey(), entry->value, NT_NOTIFY_DELETE | (local ? NT_NOTIFY_LOCAL : 0)); // remove it from idmap if (entry->id < m_idmap.size()) m_idmap[entry->id] = nullptr; entry->id = 0xffff; + entry->local_write = false; entry->value.reset(); continue; } @@ -733,6 +745,12 @@ void Storage::DeleteAllEntriesImpl(bool local) { m_entries.swap(entries); } +void Storage::DeleteAllEntriesImpl(bool local) { + // only delete non-persistent values + DeleteAllEntriesImpl(local, + [](Entry* entry) { return !entry->IsPersistent(); }); +} + void Storage::DeleteAllEntries() { std::unique_lock lock(m_mutex); if (m_entries.empty()) return; diff --git a/src/main/native/cpp/Storage.h b/src/main/native/cpp/Storage.h index 1a5538f314..611ed2747a 100644 --- a/src/main/native/cpp/Storage.h +++ b/src/main/native/cpp/Storage.h @@ -156,6 +156,10 @@ class Storage : public IStorage { // Sequence number for update resolution. SequenceNumber seq_num; + // If value has been written locally. Used during initial handshake + // on client to determine whether or not to accept remote changes. + bool local_write{false}; + // RPC handle. unsigned int rpc_uid{UINT_MAX}; @@ -220,6 +224,8 @@ class Storage : public IStorage { std::unique_lock& lock, bool local); // Must be called with m_mutex held + template + void DeleteAllEntriesImpl(bool local, F should_delete); void DeleteAllEntriesImpl(bool local); Entry* GetOrNew(StringRef name, bool* is_new = nullptr); };