Make StorageEntry a struct and move into Storage class.

This commit is contained in:
Peter Johnson
2015-07-31 13:48:33 -07:00
parent 138ebf5b4d
commit cde7782c21
3 changed files with 118 additions and 136 deletions

View File

@@ -36,7 +36,7 @@ void Storage::ClearOutgoing() {
NT_Type Storage::GetEntryType(unsigned int id) const {
std::lock_guard<std::mutex> lock(m_mutex);
if (id >= m_idmap.size()) return NT_UNASSIGNED;
auto value = m_idmap[id]->value();
auto value = m_idmap[id]->value;
if (!value) return NT_UNASSIGNED;
return value->type();
}
@@ -57,7 +57,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
case Message::kEntryAssign: {
unsigned int id = msg->id();
StringRef name = msg->str();
std::shared_ptr<StorageEntry> entry;
std::shared_ptr<Entry> entry;
bool may_need_update = false;
if (m_server) {
// if we're a server, id=0xffff requests are requests for an id
@@ -70,18 +70,18 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// create it locally
id = m_idmap.size();
auto& new_entry = m_entries[name];
if (!new_entry) new_entry = std::make_shared<StorageEntry>(name);
if (!new_entry) new_entry = std::make_shared<Entry>(name);
entry = new_entry;
entry->set_value(msg->value());
entry->set_flags(msg->flags());
entry->set_id(id);
entry->value = msg->value();
entry->flags = msg->flags();
entry->id = id;
m_idmap.push_back(entry);
// send the assignment to everyone (including the originator)
if (m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
auto outmsg = Message::EntryAssign(
name, id, entry->seq_num().value(), msg->value(), msg->flags());
name, id, entry->seq_num.value(), msg->value(), msg->flags());
lock.unlock();
queue_outgoing(outmsg, nullptr, nullptr);
}
@@ -110,13 +110,13 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
if (!new_entry) {
// didn't exist at all (rather than just being a response to a
// id assignment request)
new_entry = std::make_shared<StorageEntry>(name);
new_entry->set_value(msg->value());
new_entry->set_flags(msg->flags());
new_entry = std::make_shared<Entry>(name);
new_entry->value = msg->value();
new_entry->flags = msg->flags();
} else
may_need_update = true; // we may need to send an update message
entry = new_entry;
entry->set_id(id);
entry->id = id;
m_idmap[id] = entry;
return;
}
@@ -126,11 +126,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// already exists; ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num <= entry->seq_num()) {
if (seq_num <= entry->seq_num) {
if (may_need_update) {
auto queue_outgoing = m_queue_outgoing;
auto msg = Message::EntryUpdate(entry->id(), entry->seq_num().value(),
entry->value());
auto msg = Message::EntryUpdate(entry->id, entry->seq_num.value(),
entry->value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
@@ -138,26 +138,26 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
}
// sanity check: name should match id
if (msg->str() != entry->name()) {
if (msg->str() != entry->name) {
lock.unlock();
DEBUG("entry assignment for same id with different name?");
return;
}
// update local
entry->set_value(msg->value());
entry->set_seq_num(seq_num);
entry->value = msg->value();
entry->seq_num = seq_num;
// don't update flags from a <3.0 remote (not part of message)
if (proto_rev >= 0x0300) entry->set_flags(msg->flags());
if (proto_rev >= 0x0300) entry->flags = msg->flags();
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
if (m_server && m_queue_outgoing) {
auto queue_outgoing = m_queue_outgoing;
auto outmsg =
Message::EntryAssign(entry->name(), id, msg->seq_num_uid(),
msg->value(), entry->flags());
Message::EntryAssign(entry->name, id, msg->seq_num_uid(),
msg->value(), entry->flags);
lock.unlock();
queue_outgoing(outmsg, nullptr, conn);
}
@@ -176,11 +176,11 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
// ignore if sequence number not higher than local
SequenceNumber seq_num(msg->seq_num_uid());
if (seq_num <= entry->seq_num()) return;
if (seq_num <= entry->seq_num) return;
// update local
entry->set_value(msg->value());
entry->set_seq_num(seq_num);
entry->value = msg->value();
entry->seq_num = seq_num;
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -203,7 +203,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
auto& entry = m_idmap[id];
// update local
entry->set_flags(msg->flags());
entry->flags = msg->flags();
// broadcast to all other connections (note for client there won't
// be any other connections, so don't bother)
@@ -226,7 +226,7 @@ void Storage::ProcessIncoming(std::shared_ptr<Message> msg,
auto& entry = m_idmap[id];
// update local
m_entries.erase(entry->name()); // erase from map
m_entries.erase(entry->name); // erase from map
entry.reset(); // delete it from idmap too
// broadcast to all other connections (note for client there won't
@@ -268,10 +268,10 @@ void Storage::SendAssignments(
std::lock_guard<std::mutex> lock(m_mutex);
for (auto& i : m_entries) {
auto entry = i.getValue();
msgs.emplace_back(Message::EntryAssign(i.getKey(), entry->id(),
entry->seq_num().value(),
entry->value(), entry->flags()));
if (!m_server && reset_ids) entry->set_id(0xffff);
msgs.emplace_back(Message::EntryAssign(i.getKey(), entry->id,
entry->seq_num.value(),
entry->value, entry->flags));
if (!m_server && reset_ids) entry->id = 0xffff;
}
if (!m_server && reset_ids) m_idmap.resize(0);
}
@@ -281,7 +281,7 @@ void Storage::SendAssignments(
std::shared_ptr<Value> Storage::GetEntryValue(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
return i == m_entries.end() ? nullptr : i->getValue()->value();
return i == m_entries.end() ? nullptr : i->getValue()->value;
}
bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
@@ -289,17 +289,17 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
if (!value) return true;
std::unique_lock<std::mutex> lock(m_mutex);
auto& new_entry = m_entries[name];
if (!new_entry) new_entry = std::make_shared<StorageEntry>(name);
if (!new_entry) new_entry = std::make_shared<Entry>(name);
auto entry = new_entry;
auto old_value = entry->value();
auto old_value = entry->value;
if (old_value && old_value->type() != value->type())
return false; // error on type mismatch
entry->set_value(value);
entry->value = value;
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
if (m_server && entry->id == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
entry->id = id;
m_idmap.push_back(entry);
}
@@ -307,16 +307,16 @@ bool Storage::SetEntryValue(StringRef name, std::shared_ptr<Value> value) {
if (!m_queue_outgoing) return true;
auto queue_outgoing = m_queue_outgoing;
if (!old_value) {
auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(),
value, entry->flags());
auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(),
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else if (*old_value != *value) {
entry->seq_num_inc();
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
if (entry->id() != 0xffff) {
if (entry->id != 0xffff) {
auto msg =
Message::EntryUpdate(entry->id(), entry->seq_num().value(), value);
Message::EntryUpdate(entry->id, entry->seq_num.value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
@@ -329,15 +329,15 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
if (!value) return;
std::unique_lock<std::mutex> lock(m_mutex);
auto& entry = m_entries[name];
if (!entry) entry = std::make_shared<StorageEntry>(name);
auto old_value = entry->value();
entry->set_value(value);
if (!entry) entry = std::make_shared<Entry>(name);
auto old_value = entry->value;
entry->value = value;
if (old_value && *old_value == *value) return;
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
if (m_server && entry->id == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
entry->id = id;
m_idmap.push_back(entry);
}
@@ -345,17 +345,17 @@ void Storage::SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value) {
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
if (!old_value || old_value->type() != value->type()) {
entry->seq_num_inc();
auto msg = Message::EntryAssign(name, entry->id(), entry->seq_num().value(),
value, entry->flags());
++entry->seq_num;
auto msg = Message::EntryAssign(name, entry->id, entry->seq_num.value(),
value, entry->flags);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
} else {
entry->seq_num_inc();
++entry->seq_num;
// don't send an update if we don't have an assigned id yet
if (entry->id() != 0xffff) {
if (entry->id != 0xffff) {
auto msg =
Message::EntryUpdate(entry->id(), entry->seq_num().value(), value);
Message::EntryUpdate(entry->id, entry->seq_num.value(), value);
lock.unlock();
queue_outgoing(msg, nullptr, nullptr);
}
@@ -368,13 +368,13 @@ void Storage::SetEntryFlags(StringRef name, unsigned int flags) {
auto i = m_entries.find(name);
if (i == m_entries.end()) return;
auto entry = i->getValue();
if (entry->flags() == flags) return;
entry->set_flags(flags);
if (entry->flags == flags) return;
entry->flags = flags;
// generate message
if (!m_queue_outgoing) return;
auto queue_outgoing = m_queue_outgoing;
unsigned int id = entry->id();
unsigned int id = entry->id;
// don't send an update if we don't have an assigned id yet
if (id != 0xffff) {
lock.unlock();
@@ -385,7 +385,7 @@ void Storage::SetEntryFlags(StringRef name, unsigned int flags) {
unsigned int Storage::GetEntryFlags(StringRef name) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto i = m_entries.find(name);
return i == m_entries.end() ? 0 : i->getValue()->flags();
return i == m_entries.end() ? 0 : i->getValue()->flags;
}
void Storage::DeleteEntry(StringRef name) {
@@ -393,11 +393,11 @@ void Storage::DeleteEntry(StringRef name) {
auto i = m_entries.find(name);
if (i == m_entries.end()) return;
auto entry = i->getValue();
unsigned int id = entry->id();
unsigned int id = entry->id;
m_entries.erase(i); // erase from map
if (id < m_idmap.size()) m_idmap[id].reset();
if (!entry->value()) return;
if (!entry->value) return;
// if it had a value, generate message
// don't send an update if we don't have an assigned id yet
@@ -429,13 +429,13 @@ std::vector<EntryInfo> Storage::GetEntryInfo(StringRef prefix,
for (auto& i : m_entries) {
if (!i.getKey().startswith(prefix)) continue;
auto entry = i.getValue();
auto value = entry->value();
auto value = entry->value;
if (!value) continue;
if (types != 0 && (types & value->type()) == 0) continue;
EntryInfo info;
info.name = i.getKey();
info.type = value->type();
info.flags = entry->flags();
info.flags = entry->flags;
info.last_change = value->last_change();
infos.push_back(std::move(info));
}
@@ -484,7 +484,7 @@ void Storage::SavePersistent(std::ostream& os) const {
auto entry = i.getValue();
// only write persistent-flagged values
if (!entry->IsPersistent()) continue;
entries.push_back(std::make_pair(i.getKey(), entry->value()));
entries.push_back(std::make_pair(i.getKey(), entry->value));
}
}
@@ -869,34 +869,34 @@ next_line:
std::unique_lock<std::mutex> lock(m_mutex);
for (auto& i : entries) {
auto& entry = m_entries[i.first];
if (!entry) entry = std::make_shared<StorageEntry>(i.first);
auto old_value = entry->value();
entry->set_value(i.second);
if (!entry) entry = std::make_shared<Entry>(i.first);
auto old_value = entry->value;
entry->value = i.second;
bool was_persist = entry->IsPersistent();
if (!was_persist) entry->set_flags(entry->flags() | NT_PERSISTENT);
if (!was_persist) entry->flags |= NT_PERSISTENT;
// if we're the server, assign an id if it doesn't have one
if (m_server && entry->id() == 0xffff) {
if (m_server && entry->id == 0xffff) {
unsigned int id = m_idmap.size();
entry->set_id(id);
entry->id = id;
m_idmap.push_back(entry);
}
if (!m_queue_outgoing) continue; // shortcut
entry->seq_num_inc();
++entry->seq_num;
// put on update queue
if (!old_value || old_value->type() != i.second->type())
msgs.emplace_back(Message::EntryAssign(i.first, entry->id(),
entry->seq_num().value(),
i.second, entry->flags()));
else if (entry->id() != 0xffff) {
msgs.emplace_back(Message::EntryAssign(i.first, entry->id,
entry->seq_num.value(),
i.second, entry->flags));
else if (entry->id != 0xffff) {
// don't send an update if we don't have an assigned id yet
if (*old_value != *i.second)
msgs.emplace_back(Message::EntryUpdate(
entry->id(), entry->seq_num().value(), i.second));
entry->id, entry->seq_num.value(), i.second));
if (!was_persist)
msgs.emplace_back(Message::FlagsUpdate(entry->id(), entry->flags()));
msgs.emplace_back(Message::FlagsUpdate(entry->id, entry->flags));
}
}