From 0a18d2e57b771a649c00bce91b8faae509080043 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 19 Jul 2015 16:02:21 -0700 Subject: [PATCH] Storage: Make individual entries thread safe. Also use shared_ptr to refer to entries rather than names. --- src/Storage.cpp | 128 ++++++++++++++++++++++++++---------------------- src/Storage.h | 58 +++++++++++++++++----- 2 files changed, 116 insertions(+), 70 deletions(-) diff --git a/src/Storage.cpp b/src/Storage.cpp index 88039d12f0..11e19c2b17 100644 --- a/src/Storage.cpp +++ b/src/Storage.cpp @@ -21,79 +21,85 @@ Storage::Storage() {} Storage::~Storage() {} -std::shared_ptr Storage::GetEntryValue(StringRef name) const { +std::shared_ptr Storage::FindEntry(StringRef name) const { std::lock_guard lock(m_mutex); auto i = m_entries.find(name); - if (i == m_entries.end()) - return nullptr; - return i->getValue().value; + return i == m_entries.end() ? nullptr : i->getValue(); +} + +std::shared_ptr Storage::GetEntry(StringRef name) { + std::lock_guard lock(m_mutex); + auto entry = m_entries[name]; + if (!entry) entry.reset(new StorageEntry); + return entry; +} + +std::shared_ptr Storage::GetEntryValue(StringRef name) const { + auto entry = FindEntry(name); + if (!entry) return nullptr; + return entry->value(); } bool Storage::SetEntryValue(StringRef name, std::shared_ptr value) { if (name.empty()) return true; if (!value) return true; - std::lock_guard lock(m_mutex); - auto& entry = m_entries[name]; - if (entry.value && entry.value->type() != value->type()) + auto entry = GetEntry(name); + auto old_value = entry->value(); + if (old_value && old_value->type() != value->type()) return false; // error on type mismatch - if (!entry.value || *entry.value != *value) - m_updates.push(Update{name, Update::kValueUpdate}); // put on update queue - entry.value = value; + entry->set_value(value); + // put on update queue + if (!old_value) + m_updates.push(Update{entry, Update::kAssign}); + else if (*old_value != *value) + m_updates.push(Update{entry, Update::kValueUpdate}); return true; } void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr value) { if (name.empty()) return; if (!value) return; - std::lock_guard lock(m_mutex); - auto& entry = m_entries[name]; - if (!entry.value || *entry.value != *value) { + auto entry = GetEntry(name); + auto old_value = entry->value(); + entry->set_value(value); + if (!old_value || *old_value != *value) { // put on update queue - if (!entry.value || entry.value->type() != value->type()) - m_updates.push(Update{name, Update::kAssign}); + if (!old_value || old_value->type() != value->type()) + m_updates.push(Update{entry, Update::kAssign}); else - m_updates.push(Update{name, Update::kValueUpdate}); + m_updates.push(Update{entry, Update::kValueUpdate}); } - entry.value = value; } void Storage::SetEntryFlags(StringRef name, unsigned int flags) { std::lock_guard lock(m_mutex); - auto i = m_entries.find(name); - if (i == m_entries.end()) - return; - auto& entry = i->getValue(); - if (entry.flags != flags) - m_updates.push(Update{name, Update::kFlagsUpdate}); // put on update queue - entry.flags = flags; + auto entry = FindEntry(name); + if (!entry) return; + if (entry->flags() != flags) { + entry->set_flags(flags); + m_updates.push(Update{entry, Update::kFlagsUpdate}); // put on update queue + } } unsigned int Storage::GetEntryFlags(StringRef name) const { - std::lock_guard lock(m_mutex); - auto i = m_entries.find(name); - if (i == m_entries.end()) - return 0; - return i->getValue().flags; + auto entry = FindEntry(name); + if (!entry) return 0; + return entry->flags(); } void Storage::DeleteEntry(StringRef name) { - std::lock_guard lock(m_mutex); - auto i = m_entries.find(name); - if (i == m_entries.end()) - return; - auto& entry = i->getValue(); - if (entry.value) - m_updates.push(Update{name, Update::kDelete}); // put on update queue - entry.value = nullptr; + auto entry = FindEntry(name); + if (!entry) return; + if (entry->value()) { + entry->set_value(nullptr); + m_updates.push(Update{entry, Update::kDelete}); // put on update queue + } } void Storage::DeleteAllEntries() { std::lock_guard lock(m_mutex); - for (auto& i : m_entries) { - auto& entry = i.getValue(); - if (entry.value) entry.value = nullptr; - } - m_updates.push(Update{"", Update::kDeleteAll}); // put on update queue + for (auto& i : m_entries) i.getValue()->set_value(nullptr); + m_updates.push(Update{nullptr, Update::kDeleteAll}); // put on update queue } std::vector Storage::GetEntryInfo(StringRef prefix, @@ -102,13 +108,14 @@ std::vector Storage::GetEntryInfo(StringRef prefix, std::vector infos; for (auto& i : m_entries) { if (!i.getKey().startswith(prefix)) continue; - auto& entry = i.getValue(); - if (!entry.value) continue; + auto entry = i.getValue(); + auto value = entry->value(); + if (!value) continue; EntryInfo info; info.name = i.getKey(); - info.type = entry.value->type(); - info.flags = entry.flags; - info.last_change = entry.value->last_change(); + info.type = value->type(); + info.flags = entry->flags(); + info.last_change = value->last_change(); infos.push_back(std::move(info)); } return infos; @@ -153,10 +160,10 @@ void Storage::SavePersistent(std::ostream& os) const { std::lock_guard lock(m_mutex); entries.reserve(m_entries.size()); for (auto& i : m_entries) { - const StorageEntry& entry = i.getValue(); + auto entry = i.getValue(); // only write persistent-flagged values - if (!entry.IsPersistent()) continue; - entries.push_back(std::make_pair(i.getKey(), entry.value)); + if (!entry->IsPersistent()) continue; + entries.push_back(std::make_pair(i.getKey(), entry->value())); } } @@ -542,18 +549,21 @@ next_line: { std::lock_guard lock(m_mutex); for (auto& i : entries) { - auto& entry = m_entries[i.first]; + auto entry = m_entries[i.first]; + if (!entry) entry.reset(new StorageEntry); + auto old_value = entry->value(); + entry->set_value(i.second); // put on update queue - if (!entry.value || entry.value->type() != i.second->type()) - m_updates.push(Update{i.first, Update::kAssign}); - else if (*entry.value != *i.second) - m_updates.push(Update{i.first, Update::kValueUpdate}); - if (!entry.IsPersistent()) - m_updates.push(Update{std::move(i.first), Update::kFlagsUpdate}); + if (!old_value || old_value->type() != i.second->type()) + m_updates.push(Update{entry, Update::kAssign}); + else if (*old_value != *i.second) + m_updates.push(Update{entry, Update::kValueUpdate}); - entry.value = i.second; - entry.flags |= NT_PERSISTENT; + if (!entry->IsPersistent()) { + entry->set_flags(entry->flags() | NT_PERSISTENT); + m_updates.push(Update{entry, Update::kFlagsUpdate}); + } } } diff --git a/src/Storage.h b/src/Storage.h index 8a7ec4cdf3..3a28b36d0e 100644 --- a/src/Storage.h +++ b/src/Storage.h @@ -8,6 +8,7 @@ #ifndef NT_STORAGE_H_ #define NT_STORAGE_H_ +#include #include #include #include @@ -23,14 +24,49 @@ namespace nt { class StorageEntry { public: - StorageEntry() : id(0xffff), flags(0) {} + StorageEntry() : m_flags(0), m_id(0xffff) {} - bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; } + bool IsPersistent() const { return (m_flags & NT_PERSISTENT) != 0; } - std::shared_ptr value; - unsigned int id; - unsigned int flags; - SequenceNumber seq_num; + std::shared_ptr value() { +#ifdef HAVE_SHARED_PTR_ATOMIC_LOAD + return std::atomic_load(&m_value); +#else + std::lock_guard lock(m_value_mutex); + return m_value; +#endif + } + void set_value(std::shared_ptr value) { +#ifdef HAVE_SHARED_PTR_ATOMIC_LOAD + std::atomic_store(&m_value, value); +#else + std::lock_guard lock(m_value_mutex); + m_value = value; +#endif + } + + unsigned int flags() const { return m_flags; } + void set_flags(unsigned int flags) { m_flags = flags; } + + unsigned int id() const { return m_id; } + void set_id(unsigned int id) { m_id = id; } + + SequenceNumber seq_num() const { return m_seq_num; } + void set_seq_num(SequenceNumber seq_num) { m_seq_num = seq_num; } + + private: + // These variables are accessed by both Dispatcher and user, so must use + // atomic accesses. Unfortunately, atomic shared_ptr is not yet available + // on most compilers, so we need an explicit mutex instead. +#ifndef HAVE_SHARED_PTR_ATOMIC_LOAD + std::mutex m_value_mutex; +#endif + std::shared_ptr m_value; + std::atomic_uint m_flags; + + // Only accessed from the Dispatcher, so these are NOT mutex-protected. + unsigned int m_id; + SequenceNumber m_seq_num; }; class Storage { @@ -41,18 +77,18 @@ class Storage { } ~Storage(); - typedef llvm::StringMap EntriesMap; + typedef llvm::StringMap> EntriesMap; struct Update { - std::string name; + std::shared_ptr entry; enum Kind { kAssign, kValueUpdate, kFlagsUpdate, kDelete, kDeleteAll }; Kind kind; }; typedef ConcurrentQueue UpdateQueue; - std::mutex& mutex() { return m_mutex; } - EntriesMap& entries() { return m_entries; } - const EntriesMap& entries() const { return m_entries; } + // Finds, but does not create entry. Returns nullptr if not found. + std::shared_ptr FindEntry(StringRef name) const; + std::shared_ptr GetEntry(StringRef name); UpdateQueue& updates() { return m_updates; }