diff --git a/networktables/cpp/include/networktables2/AbstractNetworkTableEntryStore.h b/networktables/cpp/include/networktables2/AbstractNetworkTableEntryStore.h index b394cfb52f..a246368a25 100644 --- a/networktables/cpp/include/networktables2/AbstractNetworkTableEntryStore.h +++ b/networktables/cpp/include/networktables2/AbstractNetworkTableEntryStore.h @@ -35,6 +35,8 @@ public: class AbstractNetworkTableEntryStore : public IncomingEntryReceiver{ protected: + + NTReentrantSemaphore block_namedEntries; std::map idEntries; std::map namedEntries; TableListenerManager& listenerManager; @@ -45,10 +47,10 @@ protected: OutgoingEntryReceiver* incomingReceiver; - + virtual bool addEntry(NetworkTableEntry* entry) = 0; virtual bool updateEntry(NetworkTableEntry* entry, SequenceNumber sequenceNumber, EntryValue value) = 0; - + public: virtual ~AbstractNetworkTableEntryStore(); @@ -62,14 +64,14 @@ public: void SetOutgoingReceiver(OutgoingEntryReceiver* receiver); void SetIncomingReceiver(OutgoingEntryReceiver* receiver); - + void PutOutgoing(std::string& name, NetworkTableEntryType* type, EntryValue value); void PutOutgoing(NetworkTableEntry* tableEntry, EntryValue value); - + void offerIncomingAssignment(NetworkTableEntry* entry); void offerIncomingUpdate(NetworkTableEntry* entry, EntryId sequenceNumber, EntryValue value); - + void notifyEntries(ITable* table, ITableListener* listener); }; diff --git a/networktables/cpp/lib/Win32/src/main/native/OSAL/Task.cpp b/networktables/cpp/lib/Win32/src/main/native/OSAL/Task.cpp index 0e77a42ee7..a4abcf5d9f 100644 --- a/networktables/cpp/lib/Win32/src/main/native/OSAL/Task.cpp +++ b/networktables/cpp/lib/Win32/src/main/native/OSAL/Task.cpp @@ -1,6 +1,8 @@ #include "stdafx.h" #include "OSAL/Task.h" +#include "NetworkCommunication/UsageReporting.h" +#include "WPIErrors.h" #include #include @@ -55,7 +57,7 @@ DWORD thread_proc( void *p_ptr ) //This sets the name of the thread, which can help to identify threads in win32 #define MS_VC_EXCEPTION 0x406D1388 static void set_thread_name( const char *p_thread_name, DWORD ID ) -{ +{ #pragma pack(push,8) typedef struct tagTHREADNAME_INFO { DWORD dwType; // Must be 0x1000. @@ -162,7 +164,7 @@ bool NTTask::Stop() ::TerminateThread( m_Handle, 0 ); // Free the memory - ::VirtualFree( Info_.AllocationBase, 0, MEM_RELEASE ); + ::VirtualFree( Info_.AllocationBase, 0, MEM_RELEASE ); } //if (Verify()) @@ -289,19 +291,19 @@ bool NTTask::HandleError(STATUS results) //case S_objLib_OBJ_ID_ERROR: // wpi_setWPIErrorWithContext(TaskIDError, m_taskName); // break; - // + // //case S_objLib_OBJ_DELETED: // wpi_setWPIErrorWithContext(TaskDeletedError, m_taskName); // break; - // + // //case S_taskLib_ILLEGAL_OPTIONS: // wpi_setWPIErrorWithContext(TaskOptionsError, m_taskName); // break; - // + // //case S_memLib_NOT_ENOUGH_MEMORY: // wpi_setWPIErrorWithContext(TaskMemoryError, m_taskName); // break; - // + // //case S_taskLib_ILLEGAL_PRIORITY: // wpi_setWPIErrorWithContext(TaskPriorityError, m_taskName); // break; @@ -312,4 +314,3 @@ bool NTTask::HandleError(STATUS results) //} return false; } - diff --git a/networktables/cpp/lib/share/networktables2/AbstractNetworkTableEntryStore.cpp b/networktables/cpp/lib/share/networktables2/AbstractNetworkTableEntryStore.cpp index 00829a07dd..4a078e002a 100644 --- a/networktables/cpp/lib/share/networktables2/AbstractNetworkTableEntryStore.cpp +++ b/networktables/cpp/lib/share/networktables2/AbstractNetworkTableEntryStore.cpp @@ -12,10 +12,10 @@ #include #include - AbstractNetworkTableEntryStore::AbstractNetworkTableEntryStore(TableListenerManager& lstnManager): + AbstractNetworkTableEntryStore::AbstractNetworkTableEntryStore(TableListenerManager& lstnManager): listenerManager(lstnManager){ } - + AbstractNetworkTableEntryStore::~AbstractNetworkTableEntryStore(){ LOCK.take(); std::map::iterator itr; @@ -24,15 +24,15 @@ namedEntries.erase(itr++); } } - + /** * Get an entry based on it's name * @param name the name of the entry to look for * @return the entry or null if the entry does not exist */ NetworkTableEntry* AbstractNetworkTableEntryStore::GetEntry(std::string& name){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::map::iterator value_itr = namedEntries.find(name); if(value_itr != namedEntries.end()) { return value_itr->second; @@ -40,11 +40,11 @@ return NULL; } } - + NetworkTableEntry* AbstractNetworkTableEntryStore::GetEntry(EntryId entryId){ - { - NTSynchronized sync(LOCK); - + { + NTSynchronized sync(block_namedEntries); + std::map::iterator value_itr = idEntries.find(entryId); if(value_itr != idEntries.end()) { return value_itr->second; @@ -52,74 +52,74 @@ return NULL; } } - + std::vector* AbstractNetworkTableEntryStore::keys(){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::vector* keys = new std::vector(); std::map::iterator itr; - + for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++) { std::string key = (*itr).first; keys->push_back(key); } - + return (keys); } - + } - + /** * Remove all entries * NOTE: This method should not be used with applications which cache entries which would lead to unknown results * This method is for use in testing only */ void AbstractNetworkTableEntryStore::clearEntries(){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); namedEntries.clear(); idEntries.clear(); } } - + /** * clear the id's of all entries */ void AbstractNetworkTableEntryStore::clearIds(){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::map::iterator itr; idEntries.clear(); - + for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++) { ((NetworkTableEntry*)(*itr).second)->ClearId(); } } } - + void AbstractNetworkTableEntryStore::SetOutgoingReceiver(OutgoingEntryReceiver* receiver){ outgoingReceiver = receiver; } - + void AbstractNetworkTableEntryStore::SetIncomingReceiver(OutgoingEntryReceiver* receiver){ incomingReceiver = receiver; } - + /** - * Stores the given value under the given name and queues it for + * Stores the given value under the given name and queues it for * transmission to the server. - * + * * @param name The name under which to store the given value. * @param type The type of the given value. * @param value The value to store. - * @throws TableKeyExistsWithDifferentTypeException Thrown if an + * @throws TableKeyExistsWithDifferentTypeException Thrown if an * entry already exists with the given name and is of a different type. */ void AbstractNetworkTableEntryStore::PutOutgoing(std::string& name, NetworkTableEntryType* type, EntryValue value){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::map::iterator index = namedEntries.find(name); NetworkTableEntry* tableEntry; if(index == namedEntries.end())//if the name does not exist in the current entries @@ -137,21 +137,21 @@ if(tableEntry->GetType()->id != type->id){ throw TableKeyExistsWithDifferentTypeException(name, tableEntry->GetType()); } - + EntryValue oldValue = tableEntry->GetValue(); if(!type->areEqual(value, oldValue)){ if(updateEntry(tableEntry, (SequenceNumber)(tableEntry->GetSequenceNumber() + 1), value)){ outgoingReceiver->offerOutgoingUpdate(tableEntry); } - + tableEntry->FireListener(listenerManager); } } } } - + void AbstractNetworkTableEntryStore::PutOutgoing(NetworkTableEntry* tableEntry, EntryValue value){ - { + { NTSynchronized sync(LOCK); NetworkTableEntryType* type = tableEntry->GetType(); EntryValue oldValue = tableEntry->GetValue(); @@ -159,15 +159,15 @@ if(updateEntry(tableEntry, (SequenceNumber)(tableEntry->GetSequenceNumber() + 1), value)){ outgoingReceiver->offerOutgoingUpdate(tableEntry); } - + tableEntry->FireListener(listenerManager); } } } - + void AbstractNetworkTableEntryStore::offerIncomingAssignment(NetworkTableEntry* entry){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::map::iterator itr = namedEntries.find(entry->name); NetworkTableEntry* tableEntry; if(addEntry(entry)){ @@ -177,7 +177,7 @@ else{ tableEntry = entry; } - + tableEntry->FireListener(listenerManager);//if we didnt have a pointer, then the copy of the version in the list would call this method, however with the pointer we are updating the version in the list incomingReceiver->offerOutgoingAssignment(tableEntry); } @@ -185,9 +185,9 @@ delete entry; } } - + void AbstractNetworkTableEntryStore::offerIncomingUpdate(NetworkTableEntry* entry, SequenceNumber squenceNumber, EntryValue value){ - { + { NTSynchronized sync(LOCK); if(updateEntry(entry, squenceNumber, value)){ entry->FireListener(listenerManager); @@ -195,15 +195,15 @@ } } } - + /** * Called to say that a listener should notify the listener manager of all of the entries * @param listener - * @param table + * @param table */ void AbstractNetworkTableEntryStore::notifyEntries(ITable* table, ITableListener* listener){ - { - NTSynchronized sync(LOCK); + { + NTSynchronized sync(block_namedEntries); std::map::iterator itr; for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++) { diff --git a/networktables/cpp/lib/share/networktables2/client/ClientNetworkTableEntryStore.cpp b/networktables/cpp/lib/share/networktables2/client/ClientNetworkTableEntryStore.cpp index d2fc19d159..61e3cbc701 100644 --- a/networktables/cpp/lib/share/networktables2/client/ClientNetworkTableEntryStore.cpp +++ b/networktables/cpp/lib/share/networktables2/client/ClientNetworkTableEntryStore.cpp @@ -17,7 +17,7 @@ ClientNetworkTableEntryStore::~ClientNetworkTableEntryStore(){} bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){ { - NTSynchronized sync(LOCK); + NTSynchronized sync(block_namedEntries); NetworkTableEntry* entry = (NetworkTableEntry*)namedEntries[newEntry->name]; if(entry!=NULL){ @@ -28,7 +28,7 @@ bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){ idEntries[newEntry->GetId()] = entry; } } - + entry->ForcePut(newEntry->GetSequenceNumber(), newEntry->GetType(), newEntry->GetValue()); } else{ @@ -41,7 +41,7 @@ bool ClientNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry){ } bool ClientNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, SequenceNumber sequenceNumber, EntryValue value) { - { + { NTSynchronized sync(LOCK); entry->ForcePut(sequenceNumber, value); if(entry->GetId()==NetworkTableEntry::UNKNOWN_ID){ @@ -57,7 +57,7 @@ bool ClientNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, Sequenc * @throws IOException */ void ClientNetworkTableEntryStore::sendUnknownEntries(NetworkTableConnection& connection) { - { + { NTSynchronized sync(LOCK); std::map::iterator itr; for(itr = namedEntries.begin(); itr != namedEntries.end(); itr++) diff --git a/networktables/cpp/lib/share/networktables2/connection/NetworkTableConnection.cpp b/networktables/cpp/lib/share/networktables2/connection/NetworkTableConnection.cpp index 6f675b072c..c3de76a959 100644 --- a/networktables/cpp/lib/share/networktables2/connection/NetworkTableConnection.cpp +++ b/networktables/cpp/lib/share/networktables2/connection/NetworkTableConnection.cpp @@ -1,6 +1,6 @@ /** * An abstraction for the NetworkTable protocol - * + * * @author mwills * */ @@ -19,6 +19,7 @@ NetworkTableConnection::~NetworkTableConnection(){ void NetworkTableConnection::SetIOStream(IOStream* stream) { + isValid=(stream!=NULL); ioStream->SetIOStream(stream); //just passing through } @@ -102,7 +103,7 @@ void NetworkTableConnection::sendEntryUpdate(NetworkTableEntry& entry) { } void NetworkTableConnection::read(ConnectionAdapter& adapter) { - int messageType = ioStream->readByte(); + int messageType = (isValid)?ioStream->readByte():(int)KEEP_ALIVE; switch (messageType) { case KEEP_ALIVE: adapter.keepAlive(); diff --git a/networktables/cpp/lib/share/networktables2/server/NetworkTableServer.cpp b/networktables/cpp/lib/share/networktables2/server/NetworkTableServer.cpp index ea85356f99..9b7956a2df 100644 --- a/networktables/cpp/lib/share/networktables2/server/NetworkTableServer.cpp +++ b/networktables/cpp/lib/share/networktables2/server/NetworkTableServer.cpp @@ -17,10 +17,10 @@ NetworkTableServer::NetworkTableServer(IOStreamProvider& _streamProvider, Networ connectionList(&incomingStreamMonitor), writeManager(connectionList, threadManager, GetEntryStore(), ULONG_MAX), continuingReceiver(writeManager){ - + GetEntryStore().SetIncomingReceiver(&continuingReceiver); GetEntryStore().SetOutgoingReceiver(&continuingReceiver); - + incomingStreamMonitor.start(); writeManager.start(); } @@ -35,6 +35,9 @@ NetworkTableServer::~NetworkTableServer(){ void NetworkTableServer::Close(){ try{ + //Note: streamProvider must come before the incomingStreamMonitor so the that task can complete first for the thread to close + // [9/1/2013 Terminator] + streamProvider.close(); incomingStreamMonitor.stop(); writeManager.stop(); connectionList.closeAll(); diff --git a/networktables/cpp/lib/share/networktables2/server/ServerConnectionAdapter.cpp b/networktables/cpp/lib/share/networktables2/server/ServerConnectionAdapter.cpp index 195f6e795f..bc0eac2a08 100644 --- a/networktables/cpp/lib/share/networktables2/server/ServerConnectionAdapter.cpp +++ b/networktables/cpp/lib/share/networktables2/server/ServerConnectionAdapter.cpp @@ -36,6 +36,9 @@ void ServerConnectionAdapter::badMessage(BadMessageException& e) { gotoState(new ServerConnectionState_Error(e)); adapterListener.close(*this, true); m_IsAdapterListenerClosed=true; + if (readThread) { + readThread->stop(); + } } void ServerConnectionAdapter::ioException(IOException& e) { @@ -46,6 +49,9 @@ void ServerConnectionAdapter::ioException(IOException& e) { else gotoState(new ServerConnectionState_Error(e)); adapterListener.close(*this, false); + if (readThread) { + readThread->stop(); + } m_IsAdapterListenerClosed=true; } diff --git a/networktables/cpp/lib/share/networktables2/server/ServerNetworkTableEntryStore.cpp b/networktables/cpp/lib/share/networktables2/server/ServerNetworkTableEntryStore.cpp index b1d86c93b6..ac28a7e5a9 100644 --- a/networktables/cpp/lib/share/networktables2/server/ServerNetworkTableEntryStore.cpp +++ b/networktables/cpp/lib/share/networktables2/server/ServerNetworkTableEntryStore.cpp @@ -18,9 +18,9 @@ ServerNetworkTableEntryStore::~ServerNetworkTableEntryStore() bool ServerNetworkTableEntryStore::addEntry(NetworkTableEntry* newEntry) { - NTSynchronized sync(LOCK); + NTSynchronized sync(block_namedEntries); NetworkTableEntry* entry = namedEntries[newEntry->name]; - + if (entry == NULL) { newEntry->SetId(nextId++); @@ -44,13 +44,19 @@ bool ServerNetworkTableEntryStore::updateEntry(NetworkTableEntry* entry, Sequenc */ void ServerNetworkTableEntryStore::sendServerHello(NetworkTableConnection& connection) { - NTSynchronized sync(LOCK); - std::map::iterator itr; - for (itr = namedEntries.begin(); itr != namedEntries.end(); itr++) + std::vector entry_list; { - NetworkTableEntry* entry = itr->second; - connection.sendEntryAssignment(*entry); + NTSynchronized sync(block_namedEntries); + std::map::iterator itr; + for (itr = namedEntries.begin(); itr != namedEntries.end(); itr++) + { + NetworkTableEntry* entry = itr->second; + entry_list.push_back(entry); + } } + + for (size_t i=0;i