Final move of Dustins network tables patches

Change-Id: I7d351d87e63c8174b62f4ec09f805f685c80019c
This commit is contained in:
Fredric Silberberg
2015-02-07 19:19:59 -05:00
parent 39158754d7
commit bfa4bbaf78
8 changed files with 91 additions and 72 deletions

View File

@@ -35,6 +35,8 @@ public:
class AbstractNetworkTableEntryStore : public IncomingEntryReceiver{
protected:
NTReentrantSemaphore block_namedEntries;
std::map<EntryId,NetworkTableEntry*> idEntries;
std::map<std::string,NetworkTableEntry*> namedEntries;
TableListenerManager& listenerManager;
@@ -45,10 +47,10 @@ protected:
OutgoingEntryReceiver* incomingReceiver;
virtual bool addEntry(NetworkTableEntry* entry) = 0;
virtual bool updateEntry(NetworkTableEntry* entry, SequenceNumber sequenceNumber, EntryValue value) = 0;
public:
virtual ~AbstractNetworkTableEntryStore();
@@ -62,14 +64,14 @@ public:
void SetOutgoingReceiver(OutgoingEntryReceiver* receiver);
void SetIncomingReceiver(OutgoingEntryReceiver* receiver);
void PutOutgoing(std::string& name, NetworkTableEntryType* type, EntryValue value);
void PutOutgoing(NetworkTableEntry* tableEntry, EntryValue value);
void offerIncomingAssignment(NetworkTableEntry* entry);
void offerIncomingUpdate(NetworkTableEntry* entry, EntryId sequenceNumber, EntryValue value);
void notifyEntries(ITable* table, ITableListener* listener);
};

View File

@@ -1,6 +1,8 @@
#include "stdafx.h"
#include "OSAL/Task.h"
#include "NetworkCommunication/UsageReporting.h"
#include "WPIErrors.h"
#include <string.h>
#include <Windows.h>
@@ -55,7 +57,7 @@ DWORD thread_proc( void *p_ptr )
//This sets the name of the thread, which can help to identify threads in win32
#define MS_VC_EXCEPTION 0x406D1388
static void set_thread_name( const char *p_thread_name, DWORD ID )
{
{
#pragma pack(push,8)
typedef struct tagTHREADNAME_INFO
{ DWORD dwType; // Must be 0x1000.
@@ -162,7 +164,7 @@ bool NTTask::Stop()
::TerminateThread( m_Handle, 0 );
// Free the memory
::VirtualFree( Info_.AllocationBase, 0, MEM_RELEASE );
::VirtualFree( Info_.AllocationBase, 0, MEM_RELEASE );
}
//if (Verify())
@@ -289,19 +291,19 @@ bool NTTask::HandleError(STATUS results)
//case S_objLib_OBJ_ID_ERROR:
// wpi_setWPIErrorWithContext(TaskIDError, m_taskName);
// break;
//
//
//case S_objLib_OBJ_DELETED:
// wpi_setWPIErrorWithContext(TaskDeletedError, m_taskName);
// break;
//
//
//case S_taskLib_ILLEGAL_OPTIONS:
// wpi_setWPIErrorWithContext(TaskOptionsError, m_taskName);
// break;
//
//
//case S_memLib_NOT_ENOUGH_MEMORY:
// wpi_setWPIErrorWithContext(TaskMemoryError, m_taskName);
// break;
//
//
//case S_taskLib_ILLEGAL_PRIORITY:
// wpi_setWPIErrorWithContext(TaskPriorityError, m_taskName);
// break;
@@ -312,4 +314,3 @@ bool NTTask::HandleError(STATUS results)
//}
return false;
}

View File

@@ -12,10 +12,10 @@
#include <iostream>
#include <stdio.h>
AbstractNetworkTableEntryStore::AbstractNetworkTableEntryStore(TableListenerManager& lstnManager):
AbstractNetworkTableEntryStore::AbstractNetworkTableEntryStore(TableListenerManager& lstnManager):
listenerManager(lstnManager){
}
AbstractNetworkTableEntryStore::~AbstractNetworkTableEntryStore(){
LOCK.take();
std::map<std::string, NetworkTableEntry*>::iterator itr;
@@ -24,15 +24,15 @@
namedEntries.erase(itr++);
}
}
/**
* Get an entry based on it's name
* @param name the name of the entry to look for
* @return the entry or null if the entry does not exist
*/
NetworkTableEntry* AbstractNetworkTableEntryStore::GetEntry(std::string& name){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator value_itr = namedEntries.find(name);
if(value_itr != namedEntries.end()) {
return value_itr->second;
@@ -40,11 +40,11 @@
return NULL;
}
}
NetworkTableEntry* AbstractNetworkTableEntryStore::GetEntry(EntryId entryId){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<EntryId, NetworkTableEntry*>::iterator value_itr = idEntries.find(entryId);
if(value_itr != idEntries.end()) {
return value_itr->second;
@@ -52,74 +52,74 @@
return NULL;
}
}
std::vector<std::string>* AbstractNetworkTableEntryStore::keys(){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::vector<std::string>* keys = new std::vector<std::string>();
std::map<std::string, NetworkTableEntry*>::iterator itr;
for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++)
{
std::string key = (*itr).first;
keys->push_back(key);
}
return (keys);
}
}
/**
* Remove all entries
* NOTE: This method should not be used with applications which cache entries which would lead to unknown results
* This method is for use in testing only
*/
void AbstractNetworkTableEntryStore::clearEntries(){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
namedEntries.clear();
idEntries.clear();
}
}
/**
* clear the id's of all entries
*/
void AbstractNetworkTableEntryStore::clearIds(){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator itr;
idEntries.clear();
for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++)
{
((NetworkTableEntry*)(*itr).second)->ClearId();
}
}
}
void AbstractNetworkTableEntryStore::SetOutgoingReceiver(OutgoingEntryReceiver* receiver){
outgoingReceiver = receiver;
}
void AbstractNetworkTableEntryStore::SetIncomingReceiver(OutgoingEntryReceiver* receiver){
incomingReceiver = receiver;
}
/**
* Stores the given value under the given name and queues it for
* Stores the given value under the given name and queues it for
* transmission to the server.
*
*
* @param name The name under which to store the given value.
* @param type The type of the given value.
* @param value The value to store.
* @throws TableKeyExistsWithDifferentTypeException Thrown if an
* @throws TableKeyExistsWithDifferentTypeException Thrown if an
* entry already exists with the given name and is of a different type.
*/
void AbstractNetworkTableEntryStore::PutOutgoing(std::string& name, NetworkTableEntryType* type, EntryValue value){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator index = namedEntries.find(name);
NetworkTableEntry* tableEntry;
if(index == namedEntries.end())//if the name does not exist in the current entries
@@ -137,21 +137,21 @@
if(tableEntry->GetType()->id != type->id){
throw TableKeyExistsWithDifferentTypeException(name, tableEntry->GetType());
}
EntryValue oldValue = tableEntry->GetValue();
if(!type->areEqual(value, oldValue)){
if(updateEntry(tableEntry, (SequenceNumber)(tableEntry->GetSequenceNumber() + 1), value)){
outgoingReceiver->offerOutgoingUpdate(tableEntry);
}
tableEntry->FireListener(listenerManager);
}
}
}
}
void AbstractNetworkTableEntryStore::PutOutgoing(NetworkTableEntry* tableEntry, EntryValue value){
{
{
NTSynchronized sync(LOCK);
NetworkTableEntryType* type = tableEntry->GetType();
EntryValue oldValue = tableEntry->GetValue();
@@ -159,15 +159,15 @@
if(updateEntry(tableEntry, (SequenceNumber)(tableEntry->GetSequenceNumber() + 1), value)){
outgoingReceiver->offerOutgoingUpdate(tableEntry);
}
tableEntry->FireListener(listenerManager);
}
}
}
void AbstractNetworkTableEntryStore::offerIncomingAssignment(NetworkTableEntry* entry){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator itr = namedEntries.find(entry->name);
NetworkTableEntry* tableEntry;
if(addEntry(entry)){
@@ -177,7 +177,7 @@
else{
tableEntry = entry;
}
tableEntry->FireListener(listenerManager);//if we didnt have a pointer, then the copy of the version in the list would call this method, however with the pointer we are updating the version in the list
incomingReceiver->offerOutgoingAssignment(tableEntry);
}
@@ -185,9 +185,9 @@
delete entry;
}
}
void AbstractNetworkTableEntryStore::offerIncomingUpdate(NetworkTableEntry* entry, SequenceNumber squenceNumber, EntryValue value){
{
{
NTSynchronized sync(LOCK);
if(updateEntry(entry, squenceNumber, value)){
entry->FireListener(listenerManager);
@@ -195,15 +195,15 @@
}
}
}
/**
* Called to say that a listener should notify the listener manager of all of the entries
* @param listener
* @param table
* @param table
*/
void AbstractNetworkTableEntryStore::notifyEntries(ITable* table, ITableListener* listener){
{
NTSynchronized sync(LOCK);
{
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator itr;
for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++)
{

View File

@@ -17,7 +17,7 @@ ClientNetworkTableEntryStore::~ClientNetworkTableEntryStore(){}
bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){
{
NTSynchronized sync(LOCK);
NTSynchronized sync(block_namedEntries);
NetworkTableEntry* entry = (NetworkTableEntry*)namedEntries[newEntry->name];
if(entry!=NULL){
@@ -28,7 +28,7 @@ bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){
idEntries[newEntry->GetId()] = entry;
}
}
entry->ForcePut(newEntry->GetSequenceNumber(), newEntry->GetType(), newEntry->GetValue());
}
else{
@@ -41,7 +41,7 @@ bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){
}
bool ClientNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, SequenceNumber sequenceNumber, EntryValue value) {
{
{
NTSynchronized sync(LOCK);
entry->ForcePut(sequenceNumber, value);
if(entry->GetId()==NetworkTableEntry::UNKNOWN_ID){
@@ -57,7 +57,7 @@ bool ClientNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, Sequenc
* @throws IOException
*/
void ClientNetworkTableEntryStore::sendUnknownEntries(NetworkTableConnection& connection) {
{
{
NTSynchronized sync(LOCK);
std::map<std::string, NetworkTableEntry*>::iterator itr;
for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++)

View File

@@ -1,6 +1,6 @@
/**
* An abstraction for the NetworkTable protocol
*
*
* @author mwills
*
*/
@@ -19,6 +19,7 @@ NetworkTableConnection::~NetworkTableConnection(){
void NetworkTableConnection::SetIOStream(IOStream* stream)
{
isValid=(stream!=NULL);
ioStream->SetIOStream(stream); //just passing through
}
@@ -102,7 +103,7 @@ void NetworkTableConnection::sendEntryUpdate(NetworkTableEntry& entry) {
}
void NetworkTableConnection::read(ConnectionAdapter& adapter) {
int messageType = ioStream->readByte();
int messageType = (isValid)?ioStream->readByte():(int)KEEP_ALIVE;
switch (messageType) {
case KEEP_ALIVE:
adapter.keepAlive();

View File

@@ -17,10 +17,10 @@ NetworkTableServer::NetworkTableServer(IOStreamProvider& _streamProvider, Networ
connectionList(&incomingStreamMonitor),
writeManager(connectionList, threadManager, GetEntryStore(), ULONG_MAX),
continuingReceiver(writeManager){
GetEntryStore().SetIncomingReceiver(&continuingReceiver);
GetEntryStore().SetOutgoingReceiver(&continuingReceiver);
incomingStreamMonitor.start();
writeManager.start();
}
@@ -35,6 +35,9 @@ NetworkTableServer::~NetworkTableServer(){
void NetworkTableServer::Close(){
try{
//Note: streamProvider must come before the incomingStreamMonitor so the that task can complete first for the thread to close
// [9/1/2013 Terminator]
streamProvider.close();
incomingStreamMonitor.stop();
writeManager.stop();
connectionList.closeAll();

View File

@@ -36,6 +36,9 @@ void ServerConnectionAdapter::badMessage(BadMessageException& e) {
gotoState(new ServerConnectionState_Error(e));
adapterListener.close(*this, true);
m_IsAdapterListenerClosed=true;
if (readThread) {
readThread->stop();
}
}
void ServerConnectionAdapter::ioException(IOException& e) {
@@ -46,6 +49,9 @@ void ServerConnectionAdapter::ioException(IOException& e) {
else
gotoState(new ServerConnectionState_Error(e));
adapterListener.close(*this, false);
if (readThread) {
readThread->stop();
}
m_IsAdapterListenerClosed=true;
}

View File

@@ -18,9 +18,9 @@ ServerNetworkTableEntryStore::~ServerNetworkTableEntryStore()
bool ServerNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry)
{
NTSynchronized sync(LOCK);
NTSynchronized sync(block_namedEntries);
NetworkTableEntry* entry = namedEntries[newEntry->name];
if (entry == NULL)
{
newEntry->SetId(nextId++);
@@ -44,13 +44,19 @@ bool ServerNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, Sequenc
*/
void ServerNetworkTableEntryStore::sendServerHello(NetworkTableConnection& connection)
{
NTSynchronized sync(LOCK);
std::map<std::string, NetworkTableEntry*>::iterator itr;
for (itr = namedEntries.begin(); itr != namedEntries.end(); itr++)
std::vector<NetworkTableEntry *> entry_list;
{
NetworkTableEntry* entry = itr->second;
connection.sendEntryAssignment(*entry);
NTSynchronized sync(block_namedEntries);
std::map<std::string, NetworkTableEntry*>::iterator itr;
for (itr = namedEntries.begin(); itr != namedEntries.end(); itr++)
{
NetworkTableEntry* entry = itr->second;
entry_list.push_back(entry);
}
}
for (size_t i=0;i<entry_list.size();i++)
connection.sendEntryAssignment(*(entry_list[i]));
connection.sendServerHelloComplete();
connection.flush();
}