Storage: Make individual entries thread safe.

Also use shared_ptr to refer to entries rather than names.
This commit is contained in:
Peter Johnson
2015-07-19 16:02:21 -07:00
parent c08e2ed8fc
commit 0a18d2e57b
2 changed files with 116 additions and 70 deletions

View File

@@ -21,79 +21,85 @@ Storage::Storage() {}
Storage::~Storage() {}
std::shared_ptr<Value> Storage::GetEntryValue(StringRef name) const {
std::shared_ptr<StorageEntry> Storage::FindEntry(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end())
return nullptr;
return i->getValue().value;
return i == m_entries.end() ? nullptr : i->getValue();
}
std::shared_ptr<StorageEntry> Storage::GetEntry(StringRef name) {
std::lock_guard<std::mutex> lock(m_mutex);
auto entry = m_entries[name];
if (!entry) entry.reset(new StorageEntry);
return entry;
}
std::shared_ptr<Value> Storage::GetEntryValue(StringRef name) const {
auto entry = FindEntry(name);
if (!entry) return nullptr;
return entry->value();
}
bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
if (name.empty()) return true;
if (!value) return true;
std::lock_guard<std::mutex> lock(m_mutex);
auto& entry = m_entries[name];
if (entry.value && entry.value->type() != value->type())
auto entry = GetEntry(name);
auto old_value = entry->value();
if (old_value && old_value->type() != value->type())
return false; // error on type mismatch
if (!entry.value || *entry.value != *value)
m_updates.push(Update{name, Update::kValueUpdate}); // put on update queue
entry.value = value;
entry->set_value(value);
// put on update queue
if (!old_value)
m_updates.push(Update{entry, Update::kAssign});
else if (*old_value != *value)
m_updates.push(Update{entry, Update::kValueUpdate});
return true;
}
void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
if (name.empty()) return;
if (!value) return;
std::lock_guard<std::mutex> lock(m_mutex);
auto& entry = m_entries[name];
if (!entry.value || *entry.value != *value) {
auto entry = GetEntry(name);
auto old_value = entry->value();
entry->set_value(value);
if (!old_value || *old_value != *value) {
// put on update queue
if (!entry.value || entry.value->type() != value->type())
m_updates.push(Update{name, Update::kAssign});
if (!old_value || old_value->type() != value->type())
m_updates.push(Update{entry, Update::kAssign});
else
m_updates.push(Update{name, Update::kValueUpdate});
m_updates.push(Update{entry, Update::kValueUpdate});
}
entry.value = value;
}
void Storage::SetEntryFlags(StringRef name, unsigned int flags) {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end())
return;
auto& entry = i->getValue();
if (entry.flags != flags)
m_updates.push(Update{name, Update::kFlagsUpdate}); // put on update queue
entry.flags = flags;
auto entry = FindEntry(name);
if (!entry) return;
if (entry->flags() != flags) {
entry->set_flags(flags);
m_updates.push(Update{entry, Update::kFlagsUpdate}); // put on update queue
}
}
unsigned int Storage::GetEntryFlags(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end())
return 0;
return i->getValue().flags;
auto entry = FindEntry(name);
if (!entry) return 0;
return entry->flags();
}
void Storage::DeleteEntry(StringRef name) {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
if (i == m_entries.end())
return;
auto& entry = i->getValue();
if (entry.value)
m_updates.push(Update{name, Update::kDelete}); // put on update queue
entry.value = nullptr;
auto entry = FindEntry(name);
if (!entry) return;
if (entry->value()) {
entry->set_value(nullptr);
m_updates.push(Update{entry, Update::kDelete}); // put on update queue
}
}
void Storage::DeleteAllEntries() {
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
auto& entry = i.getValue();
if (entry.value) entry.value = nullptr;
}
m_updates.push(Update{"", Update::kDeleteAll}); // put on update queue
for (auto& i : m_entries) i.getValue()->set_value(nullptr);
m_updates.push(Update{nullptr, Update::kDeleteAll}); // put on update queue
}
std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
@@ -102,13 +108,14 @@ std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
std::vector<EntryInfo> infos;
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
auto& entry = i.getValue();
if (!entry.value) continue;
auto entry = i.getValue();
auto value = entry->value();
if (!value) continue;
EntryInfo info;
info.name = i.getKey();
info.type = entry.value->type();
info.flags = entry.flags;
info.last_change = entry.value->last_change();
info.type = value->type();
info.flags = entry->flags();
info.last_change = value->last_change();
infos.push_back(std::move(info));
}
return infos;
@@ -153,10 +160,10 @@ void Storage::SavePersistent(std::ostream& os) const {
std::lock_guard<std::mutex> lock(m_mutex);
entries.reserve(m_entries.size());
for (auto& i : m_entries) {
const StorageEntry& entry = i.getValue();
auto entry = i.getValue();
// only write persistent-flagged values
if (!entry.IsPersistent()) continue;
entries.push_back(std::make_pair(i.getKey(), entry.value));
if (!entry->IsPersistent()) continue;
entries.push_back(std::make_pair(i.getKey(), entry->value()));
}
}
@@ -542,18 +549,21 @@ next_line:
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : entries) {
auto& entry = m_entries[i.first];
auto entry = m_entries[i.first];
if (!entry) entry.reset(new StorageEntry);
auto old_value = entry->value();
entry->set_value(i.second);
// put on update queue
if (!entry.value || entry.value->type() != i.second->type())
m_updates.push(Update{i.first, Update::kAssign});
else if (*entry.value != *i.second)
m_updates.push(Update{i.first, Update::kValueUpdate});
if (!entry.IsPersistent())
m_updates.push(Update{std::move(i.first), Update::kFlagsUpdate});
if (!old_value || old_value->type() != i.second->type())
m_updates.push(Update{entry, Update::kAssign});
else if (*old_value != *i.second)
m_updates.push(Update{entry, Update::kValueUpdate});
entry.value = i.second;
entry.flags |= NT_PERSISTENT;
if (!entry->IsPersistent()) {
entry->set_flags(entry->flags() | NT_PERSISTENT);
m_updates.push(Update{entry, Update::kFlagsUpdate});
}
}
}