Storage: Use StringMap instead of ConcurrentQueue for updates.

Updates are merged or themselves updated as user code changes.  This avoids
the need for the dispatcher to do this and also avoids the need for disabling
updates when the dispatcher isn't running, because there's no risk of memory
usage piling up.
This commit is contained in:
Peter Johnson
2015-07-26 20:41:48 -07:00
parent fb1b82e239
commit a0d30ffef1
3 changed files with 189 additions and 74 deletions

View File

@@ -18,11 +18,74 @@ using namespace nt;
ATOMIC_STATIC_INIT(Storage)
Storage::Storage() {
m_updates_enabled = false;
m_updates_delete_all = false;
}
Storage::~Storage() {}
void Storage::GetUpdates(UpdateMap* updates, bool* delete_all) {
std::lock_guard<std::mutex> lock(m_mutex);
m_updates.swap(*updates);
m_updates.clear();
*delete_all = m_updates_delete_all;
m_updates_delete_all = false;
}
// Must be called with mutex already held
void Storage::AddUpdate(std::shared_ptr<StorageEntry> entry,
Update::Kind kind) {
if (kind == Update::kDeleteAll) {
m_updates_delete_all = true;
m_updates.clear();
return;
}
auto& update = m_updates[entry->name()];
switch (kind) {
case Update::kAssign:
update.entry = entry;
update.value = entry->value();
update.flags = entry->flags();
update.kind = Update::kAssign;
break;
case Update::kValueUpdate:
if (update.kind == Update::kNone) {
update.entry = entry;
update.kind = Update::kValueUpdate;
} else if (update.kind == Update::kFlagsUpdate) {
// Merge value+flags updates as we can only have a single update
// entry.
update.kind = Update::kValueFlagsUpdate;
} else if (update.kind == Update::kDelete) {
// Change delete + update into assign
update.kind = Update::kAssign;
update.flags = entry->flags();
}
update.value = entry->value();
break;
case Update::kFlagsUpdate:
if (update.kind == Update::kNone) {
update.entry = entry;
update.kind = Update::kFlagsUpdate;
} else if (update.kind == Update::kValueUpdate) {
// Merge value+flags updates as we can only have a single update
// entry.
update.kind = Update::kValueFlagsUpdate;
} else if (update.kind == Update::kDelete) {
return; // can't do a raw flags update
}
update.flags = entry->flags();
break;
case Update::kDelete:
update.kind = Update::kDelete;
update.entry = entry;
update.value = nullptr; // release reference if any
break;
default:
assert(false && "unexpected update kind");
break;
}
}
std::shared_ptr<StorageEntry> Storage::FindEntry(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);

View File

@@ -87,21 +87,29 @@ class Storage {
~Storage();
struct Update {
enum Kind { kAssign, kValueUpdate, kFlagsUpdate, kDelete, kDeleteAll };
Update(std::shared_ptr<StorageEntry> entry_, Kind kind_)
: entry(entry_), kind(kind_) {}
enum Kind {
kNone,
kAssign,
kValueUpdate,
kFlagsUpdate,
kValueFlagsUpdate,
kDelete,
kDeleteAll
};
Update() : flags(0), kind(kNone) {}
std::shared_ptr<StorageEntry> entry;
std::shared_ptr<Value> value;
unsigned int flags;
Kind kind;
};
typedef ConcurrentQueue<Update> UpdateQueue;
typedef llvm::StringMap<Update> UpdateMap;
// Finds, but does not create entry. Returns nullptr if not found.
std::shared_ptr<StorageEntry> FindEntry(StringRef name) const;
// Accessors required by Dispatcher.
void EnableUpdates() { m_updates_enabled = true; }
void DisableUpdates() { m_updates_enabled = false; }
UpdateQueue& updates() { return m_updates; }
void GetUpdates(UpdateMap* updates, bool* delete_all);
std::mutex& mutex() { return m_mutex; }
// User functions
@@ -124,17 +132,14 @@ class Storage {
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
template <typename... Args>
void AddUpdate(Args&&... args) {
if (m_updates_enabled) m_updates.emplace(std::forward<Args>(args)...);
}
void AddUpdate(std::shared_ptr<StorageEntry> entry, Update::Kind kind);
typedef llvm::StringMap<std::shared_ptr<StorageEntry>> EntriesMap;
mutable std::mutex m_mutex;
EntriesMap m_entries;
UpdateQueue m_updates;
std::atomic_bool m_updates_enabled;
UpdateMap m_updates;
bool m_updates_delete_all;
ATOMIC_STATIC_DECL(Storage)
};

View File

@@ -16,24 +16,23 @@ namespace nt {
class StorageTest : public ::testing::Test {
public:
StorageTest() {
storage.EnableUpdates();
}
Storage::EntriesMap& entries() { return storage.m_entries; }
Storage::UpdateQueue& updates() { return storage.updates(); }
void GetUpdates() { storage.GetUpdates(&updates, &delete_all); }
std::shared_ptr<StorageEntry> GetEntry(StringRef name) {
auto& entry = storage.m_entries[name];
if (!entry) entry = std::make_shared<StorageEntry>(name);
return entry;
}
Storage storage;
Storage::UpdateMap updates;
bool delete_all;
};
class StorageTestPopulateOne : public StorageTest {
public:
StorageTestPopulateOne() {
storage.SetEntryTypeValue("foo", Value::MakeBoolean(true));
while (!updates().empty()) updates().pop();
GetUpdates();
}
};
@@ -44,7 +43,7 @@ class StorageTestPopulated : public StorageTest {
storage.SetEntryTypeValue("foo2", Value::MakeDouble(0.0));
storage.SetEntryTypeValue("bar", Value::MakeDouble(1.0));
storage.SetEntryTypeValue("bar2", Value::MakeBoolean(false));
while (!updates().empty()) updates().pop();
GetUpdates();
}
};
@@ -87,7 +86,7 @@ class StorageTestPersistent : public StorageTest {
Value::MakeStringArray(std::vector<std::string>{"hello", "world\n"}));
storage.SetEntryTypeValue(StringRef("\0\3\5\n", 4),
Value::MakeBoolean(true));
while (!updates().empty()) updates().pop();
GetUpdates();
}
};
@@ -98,7 +97,9 @@ class MockLoadWarn {
TEST_F(StorageTest, Construct) {
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
ASSERT_TRUE(updates.empty());
ASSERT_FALSE(delete_all);
}
TEST_F(StorageTest, FindEntryNotExist) {
@@ -134,9 +135,9 @@ TEST_F(StorageTest, GetEntryValueNotExist) {
TEST_F(StorageTest, GetEntryValueExist) {
auto value = Value::MakeBoolean(true);
storage.SetEntryTypeValue("foo", value);
ASSERT_EQ(1u, updates().size());
updates().pop();
ASSERT_EQ(value, storage.GetEntryValue("foo"));
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_EQ(updates["foo"].value, storage.GetEntryValue("foo"));
}
TEST_F(StorageTest, SetEntryTypeValueAssignNew) {
@@ -145,8 +146,10 @@ TEST_F(StorageTest, SetEntryTypeValueAssignNew) {
storage.SetEntryTypeValue("foo", value);
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_FALSE(delete_all);
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kAssign, update.kind);
}
@@ -157,8 +160,10 @@ TEST_F(StorageTestPopulateOne, SetEntryTypeValueAssignTypeChange) {
storage.SetEntryTypeValue("foo", value);
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_FALSE(delete_all);
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kAssign, update.kind);
}
@@ -170,7 +175,9 @@ TEST_F(StorageTestPopulateOne, SetEntryTypeValueEqualValue) {
storage.SetEntryTypeValue("foo", value);
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTestPopulateOne, SetEntryTypeValueDifferentValue) {
@@ -179,8 +186,10 @@ TEST_F(StorageTestPopulateOne, SetEntryTypeValueDifferentValue) {
storage.SetEntryTypeValue("foo", value);
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_FALSE(delete_all);
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kValueUpdate, update.kind);
}
@@ -189,13 +198,17 @@ TEST_F(StorageTest, SetEntryTypeValueEmptyName) {
auto value = Value::MakeBoolean(true);
storage.SetEntryTypeValue("", value);
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTest, SetEntryTypeValueEmptyValue) {
storage.SetEntryTypeValue("foo", nullptr);
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTest, SetEntryValueAssignNew) {
@@ -204,8 +217,10 @@ TEST_F(StorageTest, SetEntryValueAssignNew) {
EXPECT_TRUE(storage.SetEntryValue("foo", value));
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_FALSE(delete_all);
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kAssign, update.kind);
}
@@ -216,7 +231,9 @@ TEST_F(StorageTestPopulateOne, SetEntryValueAssignTypeChange) {
EXPECT_FALSE(storage.SetEntryValue("foo", value));
auto entry = GetEntry("foo");
EXPECT_NE(value, entry->value());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTestPopulateOne, SetEntryValueEqualValue) {
@@ -226,7 +243,9 @@ TEST_F(StorageTestPopulateOne, SetEntryValueEqualValue) {
EXPECT_TRUE(storage.SetEntryValue("foo", value));
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTestPopulateOne, SetEntryValueDifferentValue) {
@@ -235,8 +254,10 @@ TEST_F(StorageTestPopulateOne, SetEntryValueDifferentValue) {
EXPECT_TRUE(storage.SetEntryValue("foo", value));
auto entry = GetEntry("foo");
EXPECT_EQ(value, entry->value());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
EXPECT_FALSE(delete_all);
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kValueUpdate, update.kind);
}
@@ -245,20 +266,26 @@ TEST_F(StorageTest, SetEntryValueEmptyName) {
auto value = Value::MakeBoolean(true);
EXPECT_TRUE(storage.SetEntryValue("", value));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTest, SetEntryValueEmptyValue) {
EXPECT_TRUE(storage.SetEntryValue("foo", nullptr));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTest, SetEntryFlagsNew) {
// flags setting doesn't create an entry
storage.SetEntryFlags("foo", 0u);
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTestPopulateOne, SetEntryFlagsEqualValue) {
@@ -267,7 +294,9 @@ TEST_F(StorageTestPopulateOne, SetEntryFlagsEqualValue) {
storage.SetEntryFlags("foo", 0u);
auto entry = GetEntry("foo");
EXPECT_EQ(0u, entry->flags());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
EXPECT_FALSE(delete_all);
}
TEST_F(StorageTestPopulateOne, SetEntryFlagsDifferentValue) {
@@ -275,8 +304,9 @@ TEST_F(StorageTestPopulateOne, SetEntryFlagsDifferentValue) {
storage.SetEntryFlags("foo", 1u);
auto entry = GetEntry("foo");
EXPECT_EQ(1u, entry->flags());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kFlagsUpdate, update.kind);
}
@@ -284,47 +314,52 @@ TEST_F(StorageTestPopulateOne, SetEntryFlagsDifferentValue) {
TEST_F(StorageTest, SetEntryFlagsEmptyName) {
storage.SetEntryFlags("", 0u);
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
}
TEST_F(StorageTest, GetEntryFlagsNotExist) {
ASSERT_EQ(0u, storage.GetEntryFlags("foo"));
ASSERT_TRUE(entries().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
}
TEST_F(StorageTestPopulateOne, GetEntryFlagsExist) {
storage.SetEntryFlags("foo", 1u);
while (!updates().empty()) updates().pop();
GetUpdates();
ASSERT_EQ(1u, storage.GetEntryFlags("foo"));
}
TEST_F(StorageTest, DeleteEntryNotExist) {
storage.DeleteEntry("foo");
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
}
TEST_F(StorageTestPopulateOne, DeleteEntryExist) {
auto entry = GetEntry("foo");
storage.DeleteEntry("foo");
ASSERT_TRUE(entries().empty());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kDelete, update.kind);
}
TEST_F(StorageTest, DeleteAllEntriesEmpty) {
storage.DeleteAllEntries();
ASSERT_TRUE(updates().empty());
GetUpdates();
EXPECT_TRUE(updates.empty());
}
TEST_F(StorageTestPopulated, DeleteAllEntries) {
storage.DeleteAllEntries();
ASSERT_TRUE(entries().empty());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
EXPECT_EQ(nullptr, update.entry);
EXPECT_EQ(Storage::Update::kDeleteAll, update.kind);
GetUpdates();
EXPECT_TRUE(updates.empty());
ASSERT_TRUE(delete_all);
}
TEST_F(StorageTestPopulated, GetEntryInfoAll) {
@@ -443,7 +478,8 @@ TEST_F(StorageTest, LoadPersistentBadHeader) {
EXPECT_CALL(warn, Warn(1, "header line mismatch, ignoring rest of file"));
EXPECT_FALSE(storage.LoadPersistent(iss2, warn_func));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
ASSERT_TRUE(updates.empty());
}
TEST_F(StorageTest, LoadPersistentCommentHeader) {
@@ -455,7 +491,8 @@ TEST_F(StorageTest, LoadPersistentCommentHeader) {
"\n; comment\n# comment\n[NetworkTables Storage 3.0]\n");
EXPECT_TRUE(storage.LoadPersistent(iss, warn_func));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
ASSERT_TRUE(updates.empty());
}
TEST_F(StorageTest, LoadPersistentEmptyName) {
@@ -467,7 +504,8 @@ TEST_F(StorageTest, LoadPersistentEmptyName) {
"[NetworkTables Storage 3.0]\nboolean \"\"=true\n");
EXPECT_TRUE(storage.LoadPersistent(iss, warn_func));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
ASSERT_TRUE(updates.empty());
}
TEST_F(StorageTest, LoadPersistentAssign) {
@@ -481,10 +519,13 @@ TEST_F(StorageTest, LoadPersistentAssign) {
auto entry = GetEntry("foo");
EXPECT_EQ(*Value::MakeBoolean(true), *entry->value());
EXPECT_EQ(NT_PERSISTENT, entry->flags());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kAssign, update.kind);
EXPECT_EQ(entry->value(), update.value);
EXPECT_EQ(NT_PERSISTENT, update.flags);
}
TEST_F(StorageTestPopulateOne, LoadPersistentUpdateFlags) {
@@ -498,10 +539,12 @@ TEST_F(StorageTestPopulateOne, LoadPersistentUpdateFlags) {
auto entry = GetEntry("foo");
EXPECT_EQ(*Value::MakeBoolean(true), *entry->value());
EXPECT_EQ(NT_PERSISTENT, entry->flags());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
ASSERT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kFlagsUpdate, update.kind);
EXPECT_EQ(NT_PERSISTENT, update.flags);
}
TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValue) {
@@ -510,7 +553,7 @@ TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValue) {
[&](std::size_t line, const char* msg) { warn.Warn(line, msg); };
GetEntry("foo")->set_flags(NT_PERSISTENT);
while (!updates().empty()) updates().pop();
GetUpdates();
std::istringstream iss(
"[NetworkTables Storage 3.0]\nboolean \"foo\"=false\n");
@@ -518,10 +561,12 @@ TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValue) {
auto entry = GetEntry("foo");
EXPECT_EQ(*Value::MakeBoolean(false), *entry->value());
EXPECT_EQ(NT_PERSISTENT, entry->flags());
ASSERT_EQ(1u, updates().size());
auto update = updates().pop();
GetUpdates();
EXPECT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kValueUpdate, update.kind);
EXPECT_EQ(entry->value(), update.value);
}
TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValueFlags) {
@@ -535,13 +580,13 @@ TEST_F(StorageTestPopulateOne, LoadPersistentUpdateValueFlags) {
auto entry = GetEntry("foo");
EXPECT_EQ(*Value::MakeBoolean(false), *entry->value());
EXPECT_EQ(NT_PERSISTENT, entry->flags());
ASSERT_EQ(2u, updates().size());
auto update = updates().pop();
GetUpdates();
ASSERT_EQ(1u, updates.size());
auto& update = updates["foo"];
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kValueUpdate, update.kind);
update = updates().pop();
EXPECT_EQ(entry, update.entry);
EXPECT_EQ(Storage::Update::kFlagsUpdate, update.kind);
EXPECT_EQ(Storage::Update::kValueFlagsUpdate, update.kind);
EXPECT_EQ(entry->value(), update.value);
EXPECT_EQ(NT_PERSISTENT, update.flags);
}
TEST_F(StorageTest, LoadPersistent) {
@@ -575,7 +620,8 @@ TEST_F(StorageTest, LoadPersistent) {
std::istringstream iss(in);
EXPECT_TRUE(storage.LoadPersistent(iss, warn_func));
ASSERT_EQ(21u, entries().size());
ASSERT_EQ(21u, updates().size());
GetUpdates();
EXPECT_EQ(21u, updates.size());
EXPECT_EQ(*Value::MakeBoolean(true), *storage.GetEntryValue("boolean/true"));
EXPECT_EQ(*Value::MakeBoolean(false),
@@ -629,7 +675,8 @@ TEST_F(StorageTest, LoadPersistentWarn) {
EXPECT_TRUE(storage.LoadPersistent(iss, warn_func));
ASSERT_TRUE(entries().empty());
ASSERT_TRUE(updates().empty());
GetUpdates();
ASSERT_TRUE(updates.empty());
}
} // namespace nt