Improve client connection synchronization behavior.

The original synchronization behavior was troublesome for two reasons:
- It had unpredictable behavior for updated values
- It brought back to life deleted values

Instead of relying on the server to inform the client regarding reconnections,
the client keeps track of what values have been modified by user code on the
client.  When the client connects to the server, the following occurs.

For entries that have been modified by user code on the client:
- If the entry is not persistent, the server value is overwritten with the
  client value
- If the entry does not exist on the server, the client sends an assignment
  to the server to recreate it on the server

For entries that have not been modified by user code on the client:
- The client value is overwritten with the server value
- If the entry does not exist on the server, the client deletes the entry

Fixes #8.
This commit is contained in:
Peter Johnson
2017-08-13 10:11:29 -07:00
parent 8099d6dbd7
commit 7c1d2f4bc4
2 changed files with 40 additions and 16 deletions

View File

@@ -426,10 +426,10 @@ void Storage::ApplyInitialAssignments(
m_notifier.NotifyEntry(entry->local_id, name, entry->value,
NT_NOTIFY_NEW);
} else {
// if reconnect and sequence number not higher than local, then we
// don't update the local value and instead send it back to the server
// as an update message
if (!new_server && seq_num <= entry->seq_num) {
// if we have written the value locally and the value is not persistent,
// then we don't update the local value and instead send it back to the
// server as an update message
if (entry->local_write && !entry->IsPersistent()) {
update_msgs.emplace_back(Message::EntryUpdate(
entry->id, entry->seq_num.value(), entry->value));
} else {
@@ -453,14 +453,21 @@ void Storage::ApplyInitialAssignments(
m_idmap[id] = entry;
}
// generate assign messages for unassigned local entries
for (auto& i : m_entries) {
Entry* entry = i.getValue();
if (entry->id != 0xffff) continue;
out_msgs->emplace_back(Message::EntryAssign(entry->name, entry->id,
entry->seq_num.value(),
entry->value, entry->flags));
}
// delete or generate assign messages for unassigned local entries
DeleteAllEntriesImpl(false, [&](Entry* entry) -> bool {
// was assigned by the server, don't delete
if (entry->id != 0xffff) return false;
// if we have written the value locally, we send an assign message to the
// server instead of deleting
if (entry->local_write) {
out_msgs->emplace_back(
Message::EntryAssign(entry->name, entry->id, entry->seq_num.value(),
entry->value, entry->flags));
return false;
}
// otherwise delete
return true;
});
auto dispatcher = m_dispatcher;
lock.unlock();
for (auto& msg : update_msgs)
@@ -561,6 +568,9 @@ void Storage::SetEntryValueImpl(Entry* entry, std::shared_ptr<Value> value,
m_notifier.NotifyEntry(entry->local_id, entry->name, value,
NT_NOTIFY_UPDATE | (local ? NT_NOTIFY_LOCAL : 0));
// remember local changes
if (local) entry->local_write = true;
// generate message
if (!m_dispatcher || (!local && !m_server)) return;
auto dispatcher = m_dispatcher;
@@ -678,10 +688,11 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it,
if (it != m_entries.end()) m_entries.erase(it);
if (id < m_idmap.size()) m_idmap[id] = nullptr;
// empty the value and reset id
// empty the value and reset id and local_write flag
std::shared_ptr<Value> old_value;
old_value.swap(entry->value);
entry->id = 0xffff;
entry->local_write = false;
// remove RPC if there was one
if (entry->rpc_uid != UINT_MAX) {
@@ -708,21 +719,22 @@ void Storage::DeleteEntryImpl(Entry* entry, EntriesMap::iterator it,
}
}
void Storage::DeleteAllEntriesImpl(bool local) {
template <typename F>
void Storage::DeleteAllEntriesImpl(bool local, F should_delete) {
if (m_entries.empty()) return;
// only delete non-persistent values
// can't erase without invalidating iterators, so build a new map
EntriesMap entries;
for (auto& i : m_entries) {
Entry* entry = i.getValue();
if (!entry->IsPersistent()) {
if (should_delete(entry)) {
// notify it's being deleted
m_notifier.NotifyEntry(entry->local_id, i.getKey(), entry->value,
NT_NOTIFY_DELETE | (local ? NT_NOTIFY_LOCAL : 0));
// remove it from idmap
if (entry->id < m_idmap.size()) m_idmap[entry->id] = nullptr;
entry->id = 0xffff;
entry->local_write = false;
entry->value.reset();
continue;
}
@@ -733,6 +745,12 @@ void Storage::DeleteAllEntriesImpl(bool local) {
m_entries.swap(entries);
}
void Storage::DeleteAllEntriesImpl(bool local) {
// only delete non-persistent values
DeleteAllEntriesImpl(local,
[](Entry* entry) { return !entry->IsPersistent(); });
}
void Storage::DeleteAllEntries() {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_entries.empty()) return;

View File

@@ -156,6 +156,10 @@ class Storage : public IStorage {
// Sequence number for update resolution.
SequenceNumber seq_num;
// If value has been written locally. Used during initial handshake
// on client to determine whether or not to accept remote changes.
bool local_write{false};
// RPC handle.
unsigned int rpc_uid{UINT_MAX};
@@ -220,6 +224,8 @@ class Storage : public IStorage {
std::unique_lock<std::mutex>& lock, bool local);
// Must be called with m_mutex held
template <typename F>
void DeleteAllEntriesImpl(bool local, F should_delete);
void DeleteAllEntriesImpl(bool local);
Entry* GetOrNew(StringRef name, bool* is_new = nullptr);
};