/* * WriteManager.cpp * * Created on: Sep 25, 2012 * Author: Mitchell Wills */ #include "networktables2/WriteManager.h" #include "networktables2/util/System.h" #include "networktables2/AbstractNetworkTableEntryStore.h" #include WriteManager::WriteManager(FlushableOutgoingEntryReceiver& _receiver, NTThreadManager& _threadManager, AbstractNetworkTableEntryStore& _entryStore, unsigned long _keepAliveDelay) : receiver(_receiver), threadManager(_threadManager), entryStore(_entryStore), keepAliveDelay(_keepAliveDelay){ thread = NULL; lastWrite = 0; incomingAssignmentQueue = new std::queue(); incomingUpdateQueue = new std::queue(); outgoingAssignmentQueue = new std::queue(); outgoingUpdateQueue = new std::queue(); } WriteManager::~WriteManager(){ stop(); //Note: this must occur after stop() to avoid deadlock transactionsLock.take(); delete incomingAssignmentQueue; delete incomingUpdateQueue; delete outgoingAssignmentQueue; delete outgoingUpdateQueue; } void WriteManager::start(){ if(thread!=NULL) stop(); lastWrite = currentTimeMillis(); thread = threadManager.newBlockingPeriodicThread(this, "Write Manager Thread"); } void WriteManager::stop(){ if(thread!=NULL){ thread->stop(); delete thread; thread = NULL; } } void WriteManager::offerOutgoingAssignment(NetworkTableEntry* entry) { { NTSynchronized sync(transactionsLock); ((std::queue*)incomingAssignmentQueue)->push(entry); if(((std::queue*)incomingAssignmentQueue)->size()>=queueSize){ run(); writeWarning("assignment queue overflowed. decrease the rate at which you create new entries or increase the write buffer size"); } } } void WriteManager::offerOutgoingUpdate(NetworkTableEntry* entry) { { NTSynchronized sync(transactionsLock); ((std::queue*)incomingUpdateQueue)->push(entry); if(((std::queue*)incomingUpdateQueue)->size()>=queueSize){ run(); writeWarning("update queue overflowed. decrease the rate at which you update entries or increase the write buffer size"); } } } void WriteManager::run() { { NTSynchronized sync(transactionsLock); //swap the assignment and update queue volatile std::queue* tmp = incomingAssignmentQueue; incomingAssignmentQueue = outgoingAssignmentQueue; outgoingAssignmentQueue = tmp; tmp = incomingUpdateQueue; incomingUpdateQueue = outgoingUpdateQueue; outgoingUpdateQueue = tmp; } bool wrote = false; NetworkTableEntry* entry; while(!((std::queue*)outgoingAssignmentQueue)->empty()){ entry = ((std::queue*)outgoingAssignmentQueue)->front(); ((std::queue*)outgoingAssignmentQueue)->pop(); { NetworkTableEntry * entryCopy; { NTSynchronized sync(entryStore.LOCK); entry->MakeClean(); wrote = true; entryCopy = new NetworkTableEntry(*entry); } receiver.offerOutgoingAssignment(entryCopy); delete entryCopy; } } while(!((std::queue*)outgoingUpdateQueue)->empty()){ entry = ((std::queue*)outgoingUpdateQueue)->front(); ((std::queue*)outgoingUpdateQueue)->pop(); { NetworkTableEntry * entryCopy; { NTSynchronized sync(entryStore.LOCK); entry->MakeClean(); wrote = true; entryCopy = new NetworkTableEntry(*entry); } receiver.offerOutgoingUpdate(entryCopy); delete entryCopy; } } if(wrote){ receiver.flush(); lastWrite = currentTimeMillis(); } else if(currentTimeMillis()-lastWrite>keepAliveDelay) receiver.ensureAlive(); sleep_ms(20); }