diff --git a/src/Dispatcher.cpp b/src/Dispatcher.cpp index 5faac964b9..8fcfd4772e 100644 --- a/src/Dispatcher.cpp +++ b/src/Dispatcher.cpp @@ -116,8 +116,6 @@ void Dispatcher::Flush() { } void Dispatcher::DispatchThreadMain() { - Storage& storage = Storage::GetInstance(); - // local copy of active m_connections struct ConnectionRef { NetworkConnection* net; @@ -160,20 +158,6 @@ void Dispatcher::DispatchThreadMain() { } } - // grab local storage updates - Storage::UpdateMap updates; - bool delete_all; - storage.GetUpdates(&updates, &delete_all); - - // local entry updates - for (auto& update_entry : updates) { - auto update = update_entry.getValue(); - switch (update.kind) { - default: - break; - } - } - // scan outgoing messages to remove unnecessary updates // send outgoing messages diff --git a/src/Storage.cpp b/src/Storage.cpp index 7e0f580b59..cbdab64135 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -19,9 +19,7 @@ using namespace nt; ATOMIC_STATIC_INIT(Storage) -Storage::Storage() { - m_updates_delete_all = false; -} +Storage::Storage() {} Storage::~Storage() {} @@ -60,6 +58,7 @@ void Storage::ProcessIncoming(std::shared_ptr msg, unsigned int id = msg->id(); StringRef name = msg->str(); std::shared_ptr entry; + bool may_need_update = false; if (m_server) { // if we're a server, id=0xffff requests are requests for an id // to be assigned, and we need to send the new assignment back to @@ -79,12 +78,13 @@ void Storage::ProcessIncoming(std::shared_ptr msg, m_idmap.push_back(entry); // send the assignment to everyone (including the originator) - lock.unlock(); - if (m_queue_outgoing) - m_queue_outgoing( - Message::EntryAssign(name, id, entry->seq_num().value(), - msg->value(), msg->flags()), - nullptr, nullptr); + if (m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + auto outmsg = Message::EntryAssign( + name, id, entry->seq_num().value(), msg->value(), msg->flags()); + lock.unlock(); + queue_outgoing(outmsg, nullptr, nullptr); + } return; } if (id >= m_idmap.size() || !m_idmap[id]) { @@ -107,10 +107,15 @@ void Storage::ProcessIncoming(std::shared_ptr msg, if (!entry) { // create local auto& new_entry = m_entries[name]; - if (!new_entry) new_entry = std::make_shared(name); + if (!new_entry) { + // didn't exist at all (rather than just being a response to a + // id assignment request) + new_entry = std::make_shared(name); + new_entry->set_value(msg->value()); + new_entry->set_flags(msg->flags()); + } else + may_need_update = true; // we may need to send an update message entry = new_entry; - entry->set_value(msg->value()); - entry->set_flags(msg->flags()); entry->set_id(id); m_idmap[id] = entry; return; @@ -121,7 +126,16 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // already exists; ignore if sequence number not higher than local SequenceNumber seq_num(msg->seq_num_uid()); - if (seq_num <= entry->seq_num()) return; + if (seq_num <= entry->seq_num()) { + if (may_need_update) { + auto queue_outgoing = m_queue_outgoing; + auto msg = Message::EntryUpdate(entry->id(), entry->seq_num().value(), + entry->value()); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } + return; + } // sanity check: name should match id if (msg->str() != entry->name()) { @@ -139,12 +153,14 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) - lock.unlock(); - if (m_server && m_queue_outgoing) - m_queue_outgoing( + if (m_server && m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + auto outmsg = Message::EntryAssign(entry->name(), id, msg->seq_num_uid(), - msg->value(), entry->flags()), - nullptr, conn); + msg->value(), entry->flags()); + lock.unlock(); + queue_outgoing(outmsg, nullptr, conn); + } break; } case Message::kEntryUpdate: { @@ -168,8 +184,11 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) - lock.unlock(); - if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn); + if (m_server && m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(msg, nullptr, conn); + } break; } case Message::kFlagsUpdate: { @@ -188,8 +207,11 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) - lock.unlock(); - if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn); + if (m_server && m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(msg, nullptr, conn); + } break; } case Message::kEntryDelete: { @@ -209,8 +231,11 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) - lock.unlock(); - if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn); + if (m_server && m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(msg, nullptr, conn); + } break; } case Message::kClearEntries: { @@ -220,8 +245,11 @@ void Storage::ProcessIncoming(std::shared_ptr msg, // broadcast to all other connections (note for client there won't // be any other connections, so don't bother) - lock.unlock(); - if (m_server && m_queue_outgoing) m_queue_outgoing(msg, nullptr, conn); + if (m_server && m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(msg, nullptr, conn); + } break; } case Message::kExecuteRpc: @@ -233,149 +261,165 @@ void Storage::ProcessIncoming(std::shared_ptr msg, } } -void Storage::GetUpdates(UpdateMap* updates, bool* delete_all) { - std::lock_guard lock(m_mutex); - m_updates.swap(*updates); - m_updates.clear(); - *delete_all = m_updates_delete_all; - m_updates_delete_all = false; -} - -// Must be called with mutex already held -void Storage::AddUpdate(std::shared_ptr entry, - Update::Kind kind) { - if (kind == Update::kDeleteAll) { - m_updates_delete_all = true; - m_updates.clear(); - return; +void Storage::SendAssignments( + std::function)> send_msg, bool reset_ids) { + std::vector> msgs; + { + std::lock_guard lock(m_mutex); + for (auto& i : m_entries) { + auto entry = i.getValue(); + msgs.emplace_back(Message::EntryAssign(i.getKey(), entry->id(), + entry->seq_num().value(), + entry->value(), entry->flags())); + if (!m_server && reset_ids) entry->set_id(0xffff); + } + if (!m_server && reset_ids) m_idmap.resize(0); } - auto& update = m_updates[entry->name()]; - switch (kind) { - case Update::kAssign: - update.entry = entry; - update.value = entry->value(); - update.flags = entry->flags(); - update.kind = Update::kAssign; - break; - case Update::kValueUpdate: - if (update.kind == Update::kNone) { - update.entry = entry; - update.kind = Update::kValueUpdate; - } else if (update.kind == Update::kFlagsUpdate) { - // Merge value+flags updates as we can only have a single update - // entry. - update.kind = Update::kValueFlagsUpdate; - } else if (update.kind == Update::kDelete) { - // Change delete + update into assign - update.kind = Update::kAssign; - update.flags = entry->flags(); - } - update.value = entry->value(); - break; - case Update::kFlagsUpdate: - if (update.kind == Update::kNone) { - update.entry = entry; - update.kind = Update::kFlagsUpdate; - } else if (update.kind == Update::kValueUpdate) { - // Merge value+flags updates as we can only have a single update - // entry. - update.kind = Update::kValueFlagsUpdate; - } else if (update.kind == Update::kDelete) { - return; // can't do a raw flags update - } - update.flags = entry->flags(); - break; - case Update::kDelete: - update.kind = Update::kDelete; - update.entry = entry; - update.value = nullptr; // release reference if any - break; - default: - assert(false && "unexpected update kind"); - break; - } -} - -std::shared_ptr Storage::FindEntry(StringRef name) const { - std::lock_guard lock(m_mutex); - auto i = m_entries.find(name); - return i == m_entries.end() ? nullptr : i->getValue(); + for (auto& msg : msgs) send_msg(std::move(msg)); } std::shared_ptr Storage::GetEntryValue(StringRef name) const { - auto entry = FindEntry(name); - if (!entry) return nullptr; - return entry->value(); + std::lock_guard lock(m_mutex); + auto i = m_entries.find(name); + return i == m_entries.end() ? nullptr : i->getValue()->value(); } bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { if (name.empty()) return true; if (!value) return true; - std::lock_guard lock(m_mutex); - auto& entry = m_entries[name]; - if (!entry) entry = std::make_shared(name); + std::unique_lock lock(m_mutex); + auto& new_entry = m_entries[name]; + if (!new_entry) new_entry = std::make_shared(name); + auto entry = new_entry; auto old_value = entry->value(); if (old_value && old_value->type() != value->type()) return false; // error on type mismatch entry->set_value(value); - // put on update queue - if (!old_value) - AddUpdate(entry, Update::kAssign); - else if (*old_value != *value) - AddUpdate(entry, Update::kValueUpdate); + + // if we're the server, assign an id if it doesn't have one + if (m_server && entry->id() == 0xffff) { + unsigned int id = m_idmap.size(); + entry->set_id(id); + m_idmap.push_back(entry); + } + + // generate message + if (!m_queue_outgoing) return true; + auto queue_outgoing = m_queue_outgoing; + if (!old_value) { + auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(), + value, entry->flags()); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } else if (*old_value != *value) { + entry->seq_num_inc(); + // don't send an update if we don't have an assigned id yet + if (entry->id() != 0xffff) { + auto msg = + Message::EntryUpdate(entry->id(), entry->seq_num().value(), value); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } + } return true; } void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { if (name.empty()) return; if (!value) return; - std::lock_guard lock(m_mutex); + std::unique_lock lock(m_mutex); auto& entry = m_entries[name]; if (!entry) entry = std::make_shared(name); auto old_value = entry->value(); entry->set_value(value); - if (!old_value || *old_value != *value) { - // put on update queue - if (!old_value || old_value->type() != value->type()) - AddUpdate(entry, Update::kAssign); - else - AddUpdate(entry, Update::kValueUpdate); + if (old_value && *old_value == *value) return; + + // if we're the server, assign an id if it doesn't have one + if (m_server && entry->id() == 0xffff) { + unsigned int id = m_idmap.size(); + entry->set_id(id); + m_idmap.push_back(entry); + } + + // generate message + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + if (!old_value || old_value->type() != value->type()) { + entry->seq_num_inc(); + auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(), + value, entry->flags()); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } else { + entry->seq_num_inc(); + // don't send an update if we don't have an assigned id yet + if (entry->id() != 0xffff) { + auto msg = + Message::EntryUpdate(entry->id(), entry->seq_num().value(), value); + lock.unlock(); + queue_outgoing(msg, nullptr, nullptr); + } } } void Storage::SetEntryFlags(StringRef name, unsigned int flags) { if (name.empty()) return; - std::lock_guard lock(m_mutex); + std::unique_lock lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return; auto entry = i->getValue(); - if (entry->flags() != flags) { - entry->set_flags(flags); - AddUpdate(entry, Update::kFlagsUpdate); // put on update queue + if (entry->flags() == flags) return; + entry->set_flags(flags); + + // generate message + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + unsigned int id = entry->id(); + // don't send an update if we don't have an assigned id yet + if (id != 0xffff) { + lock.unlock(); + queue_outgoing(Message::FlagsUpdate(id, flags), nullptr, nullptr); } } unsigned int Storage::GetEntryFlags(StringRef name) const { - auto entry = FindEntry(name); - if (!entry) return 0; - return entry->flags(); + std::lock_guard lock(m_mutex); + auto i = m_entries.find(name); + return i == m_entries.end() ? 0 : i->getValue()->flags(); } void Storage::DeleteEntry(StringRef name) { - std::lock_guard lock(m_mutex); + std::unique_lock lock(m_mutex); auto i = m_entries.find(name); if (i == m_entries.end()) return; auto entry = i->getValue(); + unsigned int id = entry->id(); m_entries.erase(i); // erase from map - // if it had a value, put on update queue - if (entry->value()) AddUpdate(entry, Update::kDelete); + if (id < m_idmap.size()) m_idmap[id].reset(); + + if (!entry->value()) return; + + // if it had a value, generate message + // don't send an update if we don't have an assigned id yet + if (id != 0xffff) { + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(Message::EntryDelete(id), nullptr, nullptr); + } } void Storage::DeleteAllEntries() { - std::lock_guard lock(m_mutex); + std::unique_lock lock(m_mutex); if (m_entries.empty()) return; m_entries.clear(); - AddUpdate(nullptr, Update::kDeleteAll); // put on update queue + m_idmap.resize(0); + + // generate message + if (!m_queue_outgoing) return; + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + queue_outgoing(Message::ClearEntries(), nullptr, nullptr); } std::vector Storage::GetEntryInfo(StringRef prefix, @@ -821,7 +865,8 @@ next_line: // copy values into storage as quickly as possible so lock isn't held { - std::lock_guard lock(m_mutex); + std::vector> msgs; + std::unique_lock lock(m_mutex); for (auto& i : entries) { auto& entry = m_entries[i.first]; if (!entry) entry = std::make_shared(i.first); @@ -830,16 +875,36 @@ next_line: bool was_persist = entry->IsPersistent(); if (!was_persist) entry->set_flags(entry->flags() | NT_PERSISTENT); + // if we're the server, assign an id if it doesn't have one + if (m_server && entry->id() == 0xffff) { + unsigned int id = m_idmap.size(); + entry->set_id(id); + m_idmap.push_back(entry); + } + + if (!m_queue_outgoing) continue; // shortcut + entry->seq_num_inc(); + // put on update queue if (!old_value || old_value->type() != i.second->type()) - AddUpdate(entry, Update::kAssign); - else { + msgs.emplace_back(Message::EntryAssign(i.first, entry->id(), + entry->seq_num().value(), + i.second, entry->flags())); + else if (entry->id() != 0xffff) { + // don't send an update if we don't have an assigned id yet if (*old_value != *i.second) - AddUpdate(entry, Update::kValueUpdate); + msgs.emplace_back(Message::EntryUpdate( + entry->id(), entry->seq_num().value(), i.second)); if (!was_persist) - AddUpdate(entry, Update::kFlagsUpdate); + msgs.emplace_back(Message::FlagsUpdate(entry->id(), entry->flags())); } } + + if (m_queue_outgoing) { + auto queue_outgoing = m_queue_outgoing; + lock.unlock(); + for (auto& msg : msgs) queue_outgoing(std::move(msg), nullptr, nullptr); + } } return true; diff --git a/src/Storage.h b/src/Storage.h index f2301b3a56..fcab231d7f 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -28,28 +28,12 @@ class StorageTest; class StorageEntry { public: - StorageEntry(llvm::StringRef name) : m_name(name), m_id(0xffff) { - m_flags = 0; - } + StorageEntry(llvm::StringRef name) : m_name(name), m_flags(0), m_id(0xffff) {} bool IsPersistent() const { return (m_flags & NT_PERSISTENT) != 0; } - std::shared_ptr value() const { -#ifdef HAVE_SHARED_PTR_ATOMIC_LOAD - return std::atomic_load(&m_value); -#else - std::lock_guard lock(m_value_mutex); - return m_value; -#endif - } - void set_value(std::shared_ptr value) { -#ifdef HAVE_SHARED_PTR_ATOMIC_LOAD - std::atomic_store(&m_value, value); -#else - std::lock_guard lock(m_value_mutex); - m_value = value; -#endif - } + std::shared_ptr value() const { return m_value; } + void set_value(std::shared_ptr value) { m_value = value; } unsigned int flags() const { return m_flags; } void set_flags(unsigned int flags) { m_flags = flags; } @@ -64,17 +48,9 @@ class StorageEntry { SequenceNumber seq_num_inc() { return ++m_seq_num; } private: - // These variables are accessed by both Dispatcher and user, so must use - // atomic accesses. Unfortunately, atomic shared_ptr is not yet available - // on most compilers, so we need an explicit mutex instead. -#ifndef HAVE_SHARED_PTR_ATOMIC_LOAD - mutable std::mutex m_value_mutex; -#endif - std::shared_ptr m_value; - std::atomic_uint m_flags; - - // Only accessed from the Dispatcher, so these are NOT mutex-protected. std::string m_name; + std::shared_ptr m_value; + unsigned int m_flags; unsigned int m_id; SequenceNumber m_seq_num; }; @@ -88,25 +64,7 @@ class Storage { } ~Storage(); - struct Update { - enum Kind { - kNone, - kAssign, - kValueUpdate, - kFlagsUpdate, - kValueFlagsUpdate, - kDelete, - kDeleteAll - }; - Update() : flags(0), kind(kNone) {} - - std::shared_ptr entry; - std::shared_ptr value; - unsigned int flags; - Kind kind; - }; - typedef llvm::StringMap UpdateMap; - + // Accessors required by Dispatcher. typedef std::function msg, NetworkConnection* only, NetworkConnection* except)> QueueOutgoingFunc; @@ -117,12 +75,9 @@ class Storage { void ProcessIncoming(std::shared_ptr msg, NetworkConnection* conn, unsigned int proto_rev); + void SendAssignments(std::function)> send_msg, + bool reset_ids); - // Finds, but does not create entry. Returns nullptr if not found. - std::shared_ptr FindEntry(StringRef name) const; - - // Accessors required by Dispatcher. - void GetUpdates(UpdateMap* updates, bool* delete_all); std::mutex& mutex() { return m_mutex; } // User functions @@ -145,16 +100,12 @@ class Storage { Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; - void AddUpdate(std::shared_ptr entry, Update::Kind kind); - typedef llvm::StringMap> EntriesMap; typedef std::vector> IdMap; mutable std::mutex m_mutex; EntriesMap m_entries; IdMap m_idmap; - UpdateMap m_updates; - bool m_updates_delete_all; QueueOutgoingFunc m_queue_outgoing; bool m_server; diff --git a/test/unit/StorageTest.cpp b/test/unit/StorageTest.cpp index 1cf4f6833c..346238adc1 100644 --- a/test/unit/StorageTest.cpp +++ b/test/unit/StorageTest.cpp @@ -14,17 +14,33 @@ namespace nt { -class StorageTest : public ::testing::Test { +class StorageTest : public ::testing::TestWithParam { public: + StorageTest() { + using namespace std::placeholders; + storage.SetOutgoing( + std::bind(&StorageTest::QueueOutgoing, this, _1, _2, _3), GetParam()); + } + Storage::EntriesMap& entries() { return storage.m_entries; } - void GetUpdates() { storage.GetUpdates(&updates, &delete_all); } + Storage::IdMap& idmap() { return storage.m_idmap; } + std::shared_ptr GetEntry(StringRef name) { - auto& entry = storage.m_entries[name]; - if (!entry) entry = std::make_shared(name); - return entry; + auto i = storage.m_entries.find(name); + return i == storage.m_entries.end() ? std::make_shared(name) + : i->getValue(); + } + struct OutgoingData { + std::shared_ptr msg; + NetworkConnection* only; + NetworkConnection* except; + }; + void QueueOutgoing(std::shared_ptr msg, NetworkConnection* only, + NetworkConnection* except) { + outgoing.emplace_back(OutgoingData{msg, only, except}); } Storage storage; - Storage::UpdateMap updates; + std::vector outgoing; bool delete_all; }; @@ -32,7 +48,7 @@ class StorageTestPopulateOne : public StorageTest { public: StorageTestPopulateOne() { storage.SetEntryTypeValue("foo", Value::MakeBoolean(true)); - GetUpdates(); + outgoing.clear(); } }; @@ -43,7 +59,7 @@ class StorageTestPopulated : public StorageTest { storage.SetEntryTypeValue("foo2", Value::MakeDouble(0.0)); storage.SetEntryTypeValue("bar", Value::MakeDouble(1.0)); storage.SetEntryTypeValue("bar2", Value::MakeBoolean(false)); - GetUpdates(); + outgoing.clear(); } }; @@ -86,7 +102,7 @@ class StorageTestPersistent : public StorageTest { Value::MakeStringArray(std::vector{"hello", "world\n"})); storage.SetEntryTypeValue(StringRef("\0\3\5\n", 4), Value::MakeBoolean(true)); - GetUpdates(); + outgoing.clear(); } }; @@ -95,31 +111,13 @@ class MockLoadWarn { MOCK_METHOD2(Warn, void(std::size_t line, const char* msg)); }; -TEST_F(StorageTest, Construct) { - ASSERT_TRUE(entries().empty()); - GetUpdates(); - ASSERT_TRUE(updates.empty()); - ASSERT_FALSE(delete_all); +TEST_P(StorageTest, Construct) { + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); } -TEST_F(StorageTest, FindEntryNotExist) { - ASSERT_FALSE(storage.FindEntry("foo")); - ASSERT_TRUE(entries().empty()); -} - -TEST_F(StorageTest, FindEntryExist) { - auto entry1 = GetEntry("foo"); - auto entry = storage.FindEntry("foo"); - ASSERT_TRUE(bool(entry)); - ASSERT_EQ(entry1, entry); - ASSERT_FALSE(storage.FindEntry("bar")); -} - -TEST_F(StorageTest, StorageEntryInit) { +TEST_P(StorageTest, StorageEntryInit) { auto entry = GetEntry("foo"); - ASSERT_TRUE(bool(entry)); - ASSERT_EQ(1u, entries().size()); - EXPECT_FALSE(entry->value()); EXPECT_EQ(0u, entry->flags()); EXPECT_EQ("foo", entry->name()); @@ -127,247 +125,298 @@ TEST_F(StorageTest, StorageEntryInit) { EXPECT_EQ(SequenceNumber(), entry->seq_num()); } -TEST_F(StorageTest, GetEntryValueNotExist) { - ASSERT_FALSE(storage.GetEntryValue("foo")); - ASSERT_TRUE(entries().empty()); +TEST_P(StorageTest, GetEntryValueNotExist) { + EXPECT_FALSE(storage.GetEntryValue("foo")); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, GetEntryValueExist) { +TEST_P(StorageTest, GetEntryValueExist) { auto value = Value::MakeBoolean(true); storage.SetEntryTypeValue("foo", value); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_EQ(updates["foo"].value, storage.GetEntryValue("foo")); + outgoing.clear(); + EXPECT_EQ(value, storage.GetEntryValue("foo")); } -TEST_F(StorageTest, SetEntryTypeValueAssignNew) { +TEST_P(StorageTest, SetEntryTypeValueAssignNew) { // brand new entry auto value = Value::MakeBoolean(true); storage.SetEntryTypeValue("foo", value); - auto entry = GetEntry("foo"); - EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_FALSE(delete_all); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kAssign, update.kind); + EXPECT_EQ(value, GetEntry("foo")->value()); + if (GetParam()) { + ASSERT_EQ(1u, idmap().size()); + EXPECT_EQ(value, idmap()[0]->value()); + } else { + EXPECT_TRUE(idmap().empty()); + } + + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryAssign, msg->type()); + EXPECT_EQ("foo", msg->str()); + if (GetParam()) + EXPECT_EQ(0u, msg->id()); // assigned as server + else + EXPECT_EQ(0xffffu, msg->id()); // not assigned as client + EXPECT_EQ(1u, msg->seq_num_uid()); + EXPECT_EQ(value, msg->value()); + EXPECT_EQ(0u, msg->flags()); } -TEST_F(StorageTestPopulateOne, SetEntryTypeValueAssignTypeChange) { +TEST_P(StorageTestPopulateOne, SetEntryTypeValueAssignTypeChange) { // update with different type results in assignment message auto value = Value::MakeDouble(0.0); storage.SetEntryTypeValue("foo", value); - auto entry = GetEntry("foo"); - EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_FALSE(delete_all); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kAssign, update.kind); + EXPECT_EQ(value, GetEntry("foo")->value()); + + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryAssign, msg->type()); + EXPECT_EQ("foo", msg->str()); + if (GetParam()) + EXPECT_EQ(0u, msg->id()); // assigned as server + else + EXPECT_EQ(0xffffu, msg->id()); // not assigned as client + EXPECT_EQ(2u, msg->seq_num_uid()); // incremented + EXPECT_EQ(value, msg->value()); + EXPECT_EQ(0u, msg->flags()); } -TEST_F(StorageTestPopulateOne, SetEntryTypeValueEqualValue) { +TEST_P(StorageTestPopulateOne, SetEntryTypeValueEqualValue) { // update with same type and same value: change value contents but no update // message is issued (minimizing bandwidth usage) auto value = Value::MakeBoolean(true); storage.SetEntryTypeValue("foo", value); - auto entry = GetEntry("foo"); - EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_EQ(value, GetEntry("foo")->value()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, SetEntryTypeValueDifferentValue) { +TEST_P(StorageTestPopulated, SetEntryTypeValueDifferentValue) { // update with same type and different value results in value update message - auto value = Value::MakeBoolean(false); - storage.SetEntryTypeValue("foo", value); - auto entry = GetEntry("foo"); - EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_FALSE(delete_all); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kValueUpdate, update.kind); + auto value = Value::MakeDouble(1.0); + storage.SetEntryTypeValue("foo2", value); + EXPECT_EQ(value, GetEntry("foo2")->value()); + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(2u, msg->seq_num_uid()); // incremented + EXPECT_EQ(value, msg->value()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + EXPECT_EQ(2u, GetEntry("foo2")->seq_num().value()); // still should be incremented + } } -TEST_F(StorageTest, SetEntryTypeValueEmptyName) { +TEST_P(StorageTest, SetEntryTypeValueEmptyName) { auto value = Value::MakeBoolean(true); storage.SetEntryTypeValue("", value); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, SetEntryTypeValueEmptyValue) { +TEST_P(StorageTest, SetEntryTypeValueEmptyValue) { storage.SetEntryTypeValue("foo", nullptr); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, SetEntryValueAssignNew) { +TEST_P(StorageTest, SetEntryValueAssignNew) { // brand new entry auto value = Value::MakeBoolean(true); EXPECT_TRUE(storage.SetEntryValue("foo", value)); - auto entry = GetEntry("foo"); - EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_FALSE(delete_all); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kAssign, update.kind); + EXPECT_EQ(value, GetEntry("foo")->value()); + + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryAssign, msg->type()); + EXPECT_EQ("foo", msg->str()); + if (GetParam()) + EXPECT_EQ(0u, msg->id()); // assigned as server + else + EXPECT_EQ(0xffffu, msg->id()); // not assigned as client + EXPECT_EQ(0u, msg->seq_num_uid()); + EXPECT_EQ(value, msg->value()); + EXPECT_EQ(0u, msg->flags()); } -TEST_F(StorageTestPopulateOne, SetEntryValueAssignTypeChange) { +TEST_P(StorageTestPopulateOne, SetEntryValueAssignTypeChange) { // update with different type results in error and no message auto value = Value::MakeDouble(0.0); EXPECT_FALSE(storage.SetEntryValue("foo", value)); auto entry = GetEntry("foo"); EXPECT_NE(value, entry->value()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, SetEntryValueEqualValue) { +TEST_P(StorageTestPopulateOne, SetEntryValueEqualValue) { // update with same type and same value: change value contents but no update // message is issued (minimizing bandwidth usage) auto value = Value::MakeBoolean(true); EXPECT_TRUE(storage.SetEntryValue("foo", value)); auto entry = GetEntry("foo"); EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, SetEntryValueDifferentValue) { +TEST_P(StorageTestPopulated, SetEntryValueDifferentValue) { // update with same type and different value results in value update message - auto value = Value::MakeBoolean(false); - EXPECT_TRUE(storage.SetEntryValue("foo", value)); - auto entry = GetEntry("foo"); + auto value = Value::MakeDouble(1.0); + EXPECT_TRUE(storage.SetEntryValue("foo2", value)); + auto entry = GetEntry("foo2"); EXPECT_EQ(value, entry->value()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - EXPECT_FALSE(delete_all); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kValueUpdate, update.kind); + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(2u, msg->seq_num_uid()); // incremented + EXPECT_EQ(value, msg->value()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + EXPECT_EQ(2u, GetEntry("foo2")->seq_num().value()); // still should be incremented + } } -TEST_F(StorageTest, SetEntryValueEmptyName) { +TEST_P(StorageTest, SetEntryValueEmptyName) { auto value = Value::MakeBoolean(true); EXPECT_TRUE(storage.SetEntryValue("", value)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, SetEntryValueEmptyValue) { +TEST_P(StorageTest, SetEntryValueEmptyValue) { EXPECT_TRUE(storage.SetEntryValue("foo", nullptr)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, SetEntryFlagsNew) { +TEST_P(StorageTest, SetEntryFlagsNew) { // flags setting doesn't create an entry storage.SetEntryFlags("foo", 0u); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, SetEntryFlagsEqualValue) { +TEST_P(StorageTestPopulateOne, SetEntryFlagsEqualValue) { // update with same value: no update message is issued (minimizing bandwidth // usage) storage.SetEntryFlags("foo", 0u); auto entry = GetEntry("foo"); EXPECT_EQ(0u, entry->flags()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - EXPECT_FALSE(delete_all); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, SetEntryFlagsDifferentValue) { +TEST_P(StorageTestPopulated, SetEntryFlagsDifferentValue) { // update with different value results in flags update message - storage.SetEntryFlags("foo", 1u); - auto entry = GetEntry("foo"); - EXPECT_EQ(1u, entry->flags()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kFlagsUpdate, update.kind); + storage.SetEntryFlags("foo2", 1u); + EXPECT_EQ(1u, GetEntry("foo2")->flags()); + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kFlagsUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(1u, msg->flags()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + } } -TEST_F(StorageTest, SetEntryFlagsEmptyName) { +TEST_P(StorageTest, SetEntryFlagsEmptyName) { storage.SetEntryFlags("", 0u); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, GetEntryFlagsNotExist) { - ASSERT_EQ(0u, storage.GetEntryFlags("foo")); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); +TEST_P(StorageTest, GetEntryFlagsNotExist) { + EXPECT_EQ(0u, storage.GetEntryFlags("foo")); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, GetEntryFlagsExist) { +TEST_P(StorageTestPopulateOne, GetEntryFlagsExist) { storage.SetEntryFlags("foo", 1u); - GetUpdates(); - ASSERT_EQ(1u, storage.GetEntryFlags("foo")); + outgoing.clear(); + EXPECT_EQ(1u, storage.GetEntryFlags("foo")); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, DeleteEntryNotExist) { +TEST_P(StorageTest, DeleteEntryNotExist) { storage.DeleteEntry("foo"); - GetUpdates(); - EXPECT_TRUE(updates.empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulateOne, DeleteEntryExist) { - auto entry = GetEntry("foo"); - storage.DeleteEntry("foo"); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kDelete, update.kind); +TEST_P(StorageTestPopulated, DeleteEntryExist) { + auto entry = GetEntry("foo2"); + storage.DeleteEntry("foo2"); + EXPECT_TRUE(entries().count("foo2") == 0); + if (GetParam()) { + ASSERT_TRUE(idmap().size() >= 2); + EXPECT_FALSE(idmap()[1]); + } + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryDelete, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + } } -TEST_F(StorageTest, DeleteAllEntriesEmpty) { +TEST_P(StorageTest, DeleteAllEntriesEmpty) { storage.DeleteAllEntries(); - GetUpdates(); - EXPECT_TRUE(updates.empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTestPopulated, DeleteAllEntries) { +TEST_P(StorageTestPopulated, DeleteAllEntries) { storage.DeleteAllEntries(); ASSERT_TRUE(entries().empty()); - GetUpdates(); - EXPECT_TRUE(updates.empty()); - ASSERT_TRUE(delete_all); + + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kClearEntries, msg->type()); } -TEST_F(StorageTestPopulated, GetEntryInfoAll) { +TEST_P(StorageTestPopulated, GetEntryInfoAll) { auto info = storage.GetEntryInfo("", 0u); ASSERT_EQ(4u, info.size()); } -TEST_F(StorageTestPopulated, GetEntryInfoPrefix) { +TEST_P(StorageTestPopulated, GetEntryInfoPrefix) { auto info = storage.GetEntryInfo("foo", 0u); ASSERT_EQ(2u, info.size()); if (info[0].name == "foo") { @@ -383,7 +432,7 @@ TEST_F(StorageTestPopulated, GetEntryInfoPrefix) { } } -TEST_F(StorageTestPopulated, GetEntryInfoTypes) { +TEST_P(StorageTestPopulated, GetEntryInfoTypes) { auto info = storage.GetEntryInfo("", NT_DOUBLE); ASSERT_EQ(2u, info.size()); EXPECT_EQ(NT_DOUBLE, info[0].type); @@ -397,20 +446,20 @@ TEST_F(StorageTestPopulated, GetEntryInfoTypes) { } } -TEST_F(StorageTestPopulated, GetEntryInfoPrefixTypes) { +TEST_P(StorageTestPopulated, GetEntryInfoPrefixTypes) { auto info = storage.GetEntryInfo("bar", NT_BOOLEAN); ASSERT_EQ(1u, info.size()); EXPECT_EQ("bar2", info[0].name); EXPECT_EQ(NT_BOOLEAN, info[0].type); } -TEST_F(StorageTestPersistent, SavePersistentEmpty) { +TEST_P(StorageTestPersistent, SavePersistentEmpty) { std::ostringstream oss; storage.SavePersistent(oss); ASSERT_EQ("[NetworkTables Storage 3.0]\n", oss.str()); } -TEST_F(StorageTestPersistent, SavePersistent) { +TEST_P(StorageTestPersistent, SavePersistent) { for (auto& i : entries()) i.getValue()->set_flags(NT_PERSISTENT); std::ostringstream oss; storage.SavePersistent(oss); @@ -465,7 +514,7 @@ TEST_F(StorageTestPersistent, SavePersistent) { ASSERT_EQ("", line); } -TEST_F(StorageTest, LoadPersistentBadHeader) { +TEST_P(StorageTest, LoadPersistentBadHeader) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -477,12 +526,12 @@ TEST_F(StorageTest, LoadPersistentBadHeader) { std::istringstream iss2("[NetworkTables"); EXPECT_CALL(warn, Warn(1, "header line mismatch, ignoring rest of file")); EXPECT_FALSE(storage.LoadPersistent(iss2, warn_func)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - ASSERT_TRUE(updates.empty()); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, LoadPersistentCommentHeader) { +TEST_P(StorageTest, LoadPersistentCommentHeader) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -490,12 +539,12 @@ TEST_F(StorageTest, LoadPersistentCommentHeader) { std::istringstream iss( "\n; comment\n# comment\n[NetworkTables Storage 3.0]\n"); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - ASSERT_TRUE(updates.empty()); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, LoadPersistentEmptyName) { +TEST_P(StorageTest, LoadPersistentEmptyName) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -503,12 +552,12 @@ TEST_F(StorageTest, LoadPersistentEmptyName) { std::istringstream iss( "[NetworkTables Storage 3.0]\nboolean \"\"=true\n"); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - ASSERT_TRUE(updates.empty()); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } -TEST_F(StorageTest, LoadPersistentAssign) { +TEST_P(StorageTest, LoadPersistentAssign) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -519,77 +568,114 @@ TEST_F(StorageTest, LoadPersistentAssign) { auto entry = GetEntry("foo"); EXPECT_EQ(*Value::MakeBoolean(true), *entry->value()); EXPECT_EQ(NT_PERSISTENT, entry->flags()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kAssign, update.kind); - EXPECT_EQ(entry->value(), update.value); - EXPECT_EQ(NT_PERSISTENT, update.flags); + + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryAssign, msg->type()); + EXPECT_EQ("foo", msg->str()); + if (GetParam()) + EXPECT_EQ(0u, msg->id()); // assigned as server + else + EXPECT_EQ(0xffffu, msg->id()); // not assigned as client + EXPECT_EQ(1u, msg->seq_num_uid()); + EXPECT_EQ(*Value::MakeBoolean(true), *msg->value()); + EXPECT_EQ(NT_PERSISTENT, msg->flags()); } -TEST_F(StorageTestPopulateOne, LoadPersistentUpdateFlags) { +TEST_P(StorageTestPopulated, LoadPersistentUpdateFlags) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; std::istringstream iss( - "[NetworkTables Storage 3.0]\nboolean \"foo\"=true\n"); + "[NetworkTables Storage 3.0]\ndouble \"foo2\"=0.0\n"); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - auto entry = GetEntry("foo"); - EXPECT_EQ(*Value::MakeBoolean(true), *entry->value()); + auto entry = GetEntry("foo2"); + EXPECT_EQ(*Value::MakeDouble(0.0), *entry->value()); EXPECT_EQ(NT_PERSISTENT, entry->flags()); - GetUpdates(); - ASSERT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kFlagsUpdate, update.kind); - EXPECT_EQ(NT_PERSISTENT, update.flags); + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kFlagsUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(NT_PERSISTENT, msg->flags()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + } } -TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValue) { +TEST_P(StorageTestPopulated, LoadPersistentUpdateValue) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; - GetEntry("foo")->set_flags(NT_PERSISTENT); - GetUpdates(); + GetEntry("foo2")->set_flags(NT_PERSISTENT); std::istringstream iss( - "[NetworkTables Storage 3.0]\nboolean \"foo\"=false\n"); + "[NetworkTables Storage 3.0]\ndouble \"foo2\"=1.0\n"); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - auto entry = GetEntry("foo"); - EXPECT_EQ(*Value::MakeBoolean(false), *entry->value()); + auto entry = GetEntry("foo2"); + EXPECT_EQ(*Value::MakeDouble(1.0), *entry->value()); EXPECT_EQ(NT_PERSISTENT, entry->flags()); - GetUpdates(); - EXPECT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kValueUpdate, update.kind); - EXPECT_EQ(entry->value(), update.value); + + if (GetParam()) { + ASSERT_EQ(1u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(2u, msg->seq_num_uid()); // incremented + EXPECT_EQ(*Value::MakeDouble(1.0), *msg->value()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + EXPECT_EQ(2u, GetEntry("foo2")->seq_num().value()); // still should be incremented + } } -TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValueFlags) { +TEST_P(StorageTestPopulated, LoadPersistentUpdateValueFlags) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; std::istringstream iss( - "[NetworkTables Storage 3.0]\nboolean \"foo\"=false\n"); + "[NetworkTables Storage 3.0]\ndouble \"foo2\"=1.0\n"); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - auto entry = GetEntry("foo"); - EXPECT_EQ(*Value::MakeBoolean(false), *entry->value()); + auto entry = GetEntry("foo2"); + EXPECT_EQ(*Value::MakeDouble(1.0), *entry->value()); EXPECT_EQ(NT_PERSISTENT, entry->flags()); - GetUpdates(); - ASSERT_EQ(1u, updates.size()); - auto& update = updates["foo"]; - EXPECT_EQ(entry, update.entry); - EXPECT_EQ(Storage::Update::kValueFlagsUpdate, update.kind); - EXPECT_EQ(entry->value(), update.value); - EXPECT_EQ(NT_PERSISTENT, update.flags); + + if (GetParam()) { + ASSERT_EQ(2u, outgoing.size()); + EXPECT_FALSE(outgoing[0].only); + EXPECT_FALSE(outgoing[0].except); + auto msg = outgoing[0].msg; + EXPECT_EQ(Message::kEntryUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(2u, msg->seq_num_uid()); // incremented + EXPECT_EQ(*Value::MakeDouble(1.0), *msg->value()); + + EXPECT_FALSE(outgoing[1].only); + EXPECT_FALSE(outgoing[1].except); + msg = outgoing[1].msg; + EXPECT_EQ(Message::kFlagsUpdate, msg->type()); + EXPECT_EQ(1u, msg->id()); // assigned as server + EXPECT_EQ(NT_PERSISTENT, msg->flags()); + } else { + // shouldn't send an update id not assigned yet (happens on client only) + EXPECT_TRUE(outgoing.empty()); + EXPECT_EQ(2u, GetEntry("foo2")->seq_num().value()); // still should be incremented + } } -TEST_F(StorageTest, LoadPersistent) { +TEST_P(StorageTest, LoadPersistent) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -620,8 +706,7 @@ TEST_F(StorageTest, LoadPersistent) { std::istringstream iss(in); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); ASSERT_EQ(21u, entries().size()); - GetUpdates(); - EXPECT_EQ(21u, updates.size()); + EXPECT_EQ(21u, outgoing.size()); EXPECT_EQ(*Value::MakeBoolean(true), *storage.GetEntryValue("boolean/true")); EXPECT_EQ(*Value::MakeBoolean(false), @@ -663,7 +748,7 @@ TEST_F(StorageTest, LoadPersistent) { *storage.GetEntryValue(StringRef("\0\3\5\n", 4))); } -TEST_F(StorageTest, LoadPersistentWarn) { +TEST_P(StorageTest, LoadPersistentWarn) { MockLoadWarn warn; auto warn_func = [&](std::size_t line, const char* msg) { warn.Warn(line, msg); }; @@ -674,9 +759,17 @@ TEST_F(StorageTest, LoadPersistentWarn) { Warn(2, "unrecognized boolean value, not 'true' or 'false'")); EXPECT_TRUE(storage.LoadPersistent(iss, warn_func)); - ASSERT_TRUE(entries().empty()); - GetUpdates(); - ASSERT_TRUE(updates.empty()); + EXPECT_TRUE(entries().empty()); + EXPECT_TRUE(idmap().empty()); + EXPECT_TRUE(outgoing.empty()); } +INSTANTIATE_TEST_CASE_P(StorageTests, StorageTest, ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(StorageTestsPopulateOne, StorageTestPopulateOne, + ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(StorageTestsPopulated, StorageTestPopulated, + ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(StorageTestsPersistent, StorageTestPersistent, + ::testing::Bool()); + } // namespace nt