diff --git a/ntcoreffi/src/main/native/cpp/DataLogManager.cpp b/ntcoreffi/src/main/native/cpp/DataLogManager.cpp index 023facd628..e1e49b747b 100644 --- a/ntcoreffi/src/main/native/cpp/DataLogManager.cpp +++ b/ntcoreffi/src/main/native/cpp/DataLogManager.cpp @@ -264,12 +264,18 @@ Thread::~Thread() { void Thread::Main() { // based on free disk space, scan for "old" FRC_*.wpilog files and remove { - uintmax_t freeSpace = fs::space(m_logDir).free; + std::error_code ec; + uintmax_t freeSpace; + auto freeSpaceInfo = fs::space(m_logDir, ec); + if (!ec) { + freeSpace = freeSpaceInfo.available; + } else { + freeSpace = UINTMAX_MAX; + } if (freeSpace < kFreeSpaceThreshold) { // Delete oldest FRC_*.wpilog files (ignore FRC_TBD_*.wpilog as we just // created one) std::vector entries; - std::error_code ec; for (auto&& entry : fs::directory_iterator{m_logDir, ec}) { auto stem = entry.path().stem().string(); if (wpi::starts_with(stem, "FRC_") && @@ -462,6 +468,9 @@ static Instance& GetInstance(std::string_view dir = "", std::string_view filename = "", double period = 0.25) { static Instance instance(dir, filename, period); + if (!instance.owner) { + instance.owner.Start(MakeLogDir(dir), filename, period); + } return instance; } @@ -470,6 +479,12 @@ void DataLogManager::Start(std::string_view dir, std::string_view filename, GetInstance(dir, filename, period); } +void DataLogManager::Stop() { + auto& inst = GetInstance(); + inst.owner.GetThread()->m_log.Stop(); + inst.owner.Stop(); +} + void DataLogManager::Log(std::string_view message) { GetInstance().owner.GetThread()->m_messageLog.Append(message); fmt::print("{}\n", message); @@ -503,6 +518,10 @@ void DLM_Start(const char* dir, const char* filename, double period) { DataLogManager::Start(dir, filename, period); } +void DLM_Stop(void) { + DataLogManager::Stop(); +} + void DLM_Log(const char* message) { DataLogManager::Log(message); } diff --git a/ntcoreffi/src/main/native/include/DataLogManager.h b/ntcoreffi/src/main/native/include/DataLogManager.h index 8445399dde..d2635c17e4 100644 --- a/ntcoreffi/src/main/native/include/DataLogManager.h +++ b/ntcoreffi/src/main/native/include/DataLogManager.h @@ -52,6 +52,11 @@ class DataLogManager final { static void Start(std::string_view dir = "", std::string_view filename = "", double period = 0.25); + /** + * Stop data log manager. + */ + static void Stop(); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). @@ -110,6 +115,11 @@ struct WPI_DataLog; */ void DLM_Start(const char* dir, const char* filename, double period); +/** + * Stop data log manager. + */ +void DLM_Stop(void); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). diff --git a/ntcoreffi/src/main/native/symbols.txt b/ntcoreffi/src/main/native/symbols.txt index f7f299a6cd..c18014b71c 100644 --- a/ntcoreffi/src/main/native/symbols.txt +++ b/ntcoreffi/src/main/native/symbols.txt @@ -4,6 +4,7 @@ DLM_Log DLM_LogNetworkTables DLM_SignalNewDSDataOccur DLM_Start +DLM_Stop NT_AddListener NT_AddListenerMultiple NT_AddListenerSingle @@ -245,6 +246,7 @@ WPI_DataLog_Resume WPI_DataLog_SetFilename WPI_DataLog_SetMetadata WPI_DataLog_Start +WPI_DataLog_Stop WPI_DestroyEvent WPI_DestroySemaphore WPI_DestroySignalObject diff --git a/wpilibc/src/main/native/cpp/DataLogManager.cpp b/wpilibc/src/main/native/cpp/DataLogManager.cpp index 3907db8432..afe9330daa 100644 --- a/wpilibc/src/main/native/cpp/DataLogManager.cpp +++ b/wpilibc/src/main/native/cpp/DataLogManager.cpp @@ -111,12 +111,18 @@ Thread::~Thread() { void Thread::Main() { // based on free disk space, scan for "old" FRC_*.wpilog files and remove { - uintmax_t freeSpace = fs::space(m_logDir).available; + std::error_code ec; + uintmax_t freeSpace; + auto freeSpaceInfo = fs::space(m_logDir, ec); + if (!ec) { + freeSpace = freeSpaceInfo.available; + } else { + freeSpace = UINTMAX_MAX; + } if (freeSpace < kFreeSpaceThreshold) { // Delete oldest FRC_*.wpilog files (ignore FRC_TBD_*.wpilog as we just // created one) std::vector entries; - std::error_code ec; for (auto&& entry : fs::directory_iterator{m_logDir, ec}) { auto stem = entry.path().stem().string(); if (wpi::starts_with(stem, "FRC_") && @@ -308,6 +314,9 @@ static Instance& GetInstance(std::string_view dir = "", std::string_view filename = "", double period = 0.25) { static Instance instance(dir, filename, period); + if (!instance.owner) { + instance.owner.Start(MakeLogDir(dir), filename, period); + } return instance; } @@ -316,6 +325,12 @@ void DataLogManager::Start(std::string_view dir, std::string_view filename, GetInstance(dir, filename, period); } +void DataLogManager::Stop() { + auto& inst = GetInstance(); + inst.owner.GetThread()->m_log.Stop(); + inst.owner.Stop(); +} + void DataLogManager::Log(std::string_view message) { GetInstance().owner.GetThread()->m_messageLog.Append(message); fmt::print("{}\n", message); diff --git a/wpilibc/src/main/native/include/frc/DataLogManager.h b/wpilibc/src/main/native/include/frc/DataLogManager.h index fa7abbaec5..336af4a01e 100644 --- a/wpilibc/src/main/native/include/frc/DataLogManager.h +++ b/wpilibc/src/main/native/include/frc/DataLogManager.h @@ -49,6 +49,11 @@ class DataLogManager final { static void Start(std::string_view dir = "", std::string_view filename = "", double period = 0.25); + /** + * Stop data log manager. + */ + static void Stop(); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). diff --git a/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java b/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java index ba2d7808f3..8098ae9af9 100644 --- a/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java +++ b/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java @@ -41,9 +41,10 @@ import java.util.Random; */ public final class DataLogManager { private static DataLog m_log; + private static boolean m_stopped; private static String m_logDir; private static boolean m_filenameOverride; - private static final Thread m_thread; + private static Thread m_thread; private static final ZoneId m_utc = ZoneId.of("UTC"); private static final DateTimeFormatter m_timeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss").withZone(m_utc); @@ -59,11 +60,6 @@ public final class DataLogManager { private DataLogManager() {} - static { - m_thread = new Thread(DataLogManager::logMain, "DataLogDS"); - m_thread.setDaemon(true); - } - /** Start data log manager with default directory location. */ public static synchronized void start() { start("", "", 0.25); @@ -100,33 +96,52 @@ public final class DataLogManager { * tradeoff */ public static synchronized void start(String dir, String filename, double period) { - if (m_log != null) { - return; - } - m_logDir = makeLogDir(dir); - m_filenameOverride = !filename.isEmpty(); + if (m_log == null) { + m_logDir = makeLogDir(dir); + m_filenameOverride = !filename.isEmpty(); - // Delete all previously existing FRC_TBD_*.wpilog files. These only exist when the robot - // never connects to the DS, so they are very unlikely to have useful data and just clutter - // the filesystem. - File[] files = - new File(m_logDir) - .listFiles((d, name) -> name.startsWith("FRC_TBD_") && name.endsWith(".wpilog")); - if (files != null) { - for (File file : files) { - if (!file.delete()) { - System.err.println("DataLogManager: could not delete " + file); + // Delete all previously existing FRC_TBD_*.wpilog files. These only exist when the robot + // never connects to the DS, so they are very unlikely to have useful data and just clutter + // the filesystem. + File[] files = + new File(m_logDir) + .listFiles((d, name) -> name.startsWith("FRC_TBD_") && name.endsWith(".wpilog")); + if (files != null) { + for (File file : files) { + if (!file.delete()) { + System.err.println("DataLogManager: could not delete " + file); + } } } + m_log = new DataLog(m_logDir, makeLogFilename(filename), period); + m_messageLog = new StringLogEntry(m_log, "messages"); + + // Log all NT entries and connections + if (m_ntLoggerEnabled) { + startNtLog(); + } + } else if (m_stopped) { + m_log.setFilename(makeLogFilename(filename)); + m_log.resume(); + m_stopped = false; } - m_log = new DataLog(m_logDir, makeLogFilename(filename), period); - m_messageLog = new StringLogEntry(m_log, "messages"); - m_thread.start(); + if (m_thread == null) { + m_thread = new Thread(DataLogManager::logMain, "DataLogDS"); + m_thread.setDaemon(true); + m_thread.start(); + } + } - // Log all NT entries and connections - if (m_ntLoggerEnabled) { - startNtLog(); + /** Stop data log manager. */ + public static synchronized void stop() { + if (m_thread != null) { + m_thread.interrupt(); + m_thread = null; + } + if (m_log != null) { + m_log.stop(); + m_stopped = true; } } diff --git a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java index 025487cd29..97c629f6a2 100644 --- a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java +++ b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java @@ -106,11 +106,20 @@ public final class DataLog implements AutoCloseable { DataLogJNI.pause(m_impl); } - /** Resumes appending of data records to the log. */ + /** + * Resumes appending of data records to the log. If called after stop(), opens a new file (with + * random name if SetFilename was not called after stop()) and appends Start records and schema + * data values for all previously started entries and schemas. + */ public void resume() { DataLogJNI.resume(m_impl); } + /** Stops appending all records to the log, and closes the log file. */ + public void stop() { + DataLogJNI.stop(m_impl); + } + /** * Returns whether there is a data schema already registered with the given name. * diff --git a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java index 08b108c083..f94a86f057 100644 --- a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java +++ b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java @@ -18,6 +18,8 @@ public class DataLogJNI extends WPIUtilJNI { static native void resume(long impl); + static native void stop(long impl); + static native void addSchema(long impl, String name, String type, byte[] schema, long timestamp); static native void addSchemaString( diff --git a/wpiutil/src/main/native/cpp/DataLog.cpp b/wpiutil/src/main/native/cpp/DataLog.cpp index 9994618fb5..d05a49e43b 100644 --- a/wpiutil/src/main/native/cpp/DataLog.cpp +++ b/wpiutil/src/main/native/cpp/DataLog.cpp @@ -179,7 +179,7 @@ DataLog::DataLog(wpi::Logger& msglog, DataLog::~DataLog() { { std::scoped_lock lock{m_mutex}; - m_active = false; + m_state = kShutdown; m_doFlush = true; } m_cond.notify_all(); @@ -204,12 +204,25 @@ void DataLog::Flush() { void DataLog::Pause() { std::scoped_lock lock{m_mutex}; - m_paused = true; + m_state = kPaused; } void DataLog::Resume() { std::scoped_lock lock{m_mutex}; - m_paused = false; + if (m_state == kPaused) { + m_state = kActive; + } else if (m_state == kStopped) { + m_state = kStart; + } +} + +void DataLog::Stop() { + { + std::scoped_lock lock{m_mutex}; + m_state = kStopped; + m_newFilename.clear(); + } + m_cond.notify_all(); } bool DataLog::HasSchema(std::string_view name) const { @@ -229,12 +242,16 @@ void DataLog::AddSchema(std::string_view name, std::string_view type, if (entryInfo.id != 0) { return; // don't add duplicates } + entryInfo.schemaData.assign(schema.begin(), schema.end()); int entry = StartImpl(fullName, type, {}, timestamp); // inline AppendRaw() without releasing lock if (entry <= 0) { [[unlikely]] return; // should never happen, but check anyway } + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } StartRecord(entry, timestamp, schema.size(), 0); AppendImpl(schema); } @@ -283,105 +300,201 @@ static std::string MakeRandomFilename() { return filename; } -void DataLog::WriterThreadMain(std::string_view dir) { - std::chrono::duration periodTime{m_period}; +struct DataLog::WriterThreadState { + explicit WriterThreadState(std::string_view dir) : dirPath{dir} {} + WriterThreadState(const WriterThreadState&) = delete; + WriterThreadState& operator=(const WriterThreadState&) = delete; + ~WriterThreadState() { Close(); } - std::error_code ec; - fs::path dirPath{dir}; + void Close() { + if (f != fs::kInvalidFile) { + fs::CloseFile(f); + f = fs::kInvalidFile; + } + } + + void SetFilename(std::string_view fn) { + baseFilename = fn; + filename = fn; + path = dirPath / filename; + segmentCount = 1; + } + + void IncrementFilename() { + fs::path basePath{baseFilename}; + filename = fmt::format("{}.{}{}", basePath.stem().string(), ++segmentCount, + basePath.extension().string()); + path = dirPath / filename; + } + + fs::path dirPath; + std::string baseFilename; std::string filename; - - { - std::scoped_lock lock{m_mutex}; - filename = std::move(m_newFilename); - m_newFilename.clear(); - } - - if (filename.empty()) { - filename = MakeRandomFilename(); - } - + fs::path path; fs::file_t f = fs::kInvalidFile; + uintmax_t freeSpace = UINTMAX_MAX; + int segmentCount = 1; +}; + +void DataLog::StartLogFile(WriterThreadState& state) { + std::error_code ec; + + if (state.filename.empty()) { + state.SetFilename(MakeRandomFilename()); + } // get free space - uintmax_t freeSpace = fs::space(dirPath).available; - if (freeSpace < kMinFreeSpace) { + auto freeSpaceInfo = fs::space(state.dirPath, ec); + if (!ec) { + state.freeSpace = freeSpaceInfo.available; + } else { + state.freeSpace = UINTMAX_MAX; + } + if (state.freeSpace < kMinFreeSpace) { WPI_ERROR(m_msglog, "Insufficient free space ({} available), no log being saved", - FormatBytesSize(freeSpace)); + FormatBytesSize(state.freeSpace)); } else { // try preferred filename, or randomize it a few times, before giving up for (int i = 0; i < 5; ++i) { // open file for append #ifdef _WIN32 // WIN32 doesn't allow combination of CreateNew and Append - f = fs::OpenFileForWrite(dirPath / filename, ec, fs::CD_CreateNew, - fs::OF_None); + state.f = + fs::OpenFileForWrite(state.path, ec, fs::CD_CreateNew, fs::OF_None); #else - f = fs::OpenFileForWrite(dirPath / filename, ec, fs::CD_CreateNew, - fs::OF_Append); + state.f = + fs::OpenFileForWrite(state.path, ec, fs::CD_CreateNew, fs::OF_Append); #endif if (ec) { WPI_ERROR(m_msglog, "Could not open log file '{}': {}", - (dirPath / filename).string(), ec.message()); + state.path.string(), ec.message()); // try again with random filename - filename = MakeRandomFilename(); + state.SetFilename(MakeRandomFilename()); } else { break; } } - if (f == fs::kInvalidFile) { + if (state.f == fs::kInvalidFile) { WPI_ERROR(m_msglog, "Could not open log file, no log being saved"); } else { - WPI_INFO(m_msglog, "Logging to '{}' ({} free space)", - (dirPath / filename).string(), FormatBytesSize(freeSpace)); + WPI_INFO(m_msglog, "Logging to '{}' ({} free space)", state.path.string(), + FormatBytesSize(state.freeSpace)); } } // write header (version 1.0) - if (f != fs::kInvalidFile) { + if (state.f != fs::kInvalidFile) { const uint8_t header[] = {'W', 'P', 'I', 'L', 'O', 'G', 0, 1}; - WriteToFile(f, header, filename, m_msglog); + WriteToFile(state.f, header, state.filename, m_msglog); uint8_t extraLen[4]; support::endian::write32le(extraLen, m_extraHeader.size()); - WriteToFile(f, extraLen, filename, m_msglog); + WriteToFile(state.f, extraLen, state.filename, m_msglog); if (m_extraHeader.size() > 0) { - WriteToFile(f, + WriteToFile(state.f, {reinterpret_cast(m_extraHeader.data()), m_extraHeader.size()}, - filename, m_msglog); + state.filename, m_msglog); } } +} +void DataLog::WriterThreadMain(std::string_view dir) { + std::chrono::duration periodTime{m_period}; + + WriterThreadState state{dir}; + { + std::scoped_lock lock{m_mutex}; + state.SetFilename(m_newFilename); + m_newFilename.clear(); + } + StartLogFile(state); + + std::error_code ec; std::vector toWrite; int freeSpaceCount = 0; + int checkExistCount = 0; bool blocked = false; + uintmax_t written = 0; std::unique_lock lock{m_mutex}; - while (m_active) { + while (m_state != kShutdown) { bool doFlush = false; auto timeoutTime = std::chrono::steady_clock::now() + periodTime; if (m_cond.wait_until(lock, timeoutTime) == std::cv_status::timeout) { doFlush = true; } - if (!m_newFilename.empty() && f != fs::kInvalidFile) { + if (m_state == kStopped) { + state.Close(); + continue; + } + + bool doStart = false; + + // if file was deleted, recreate it with the same name + if (++checkExistCount >= 10) { + checkExistCount = 0; + lock.unlock(); + bool exists = fs::exists(state.path, ec); + lock.lock(); + if (!ec && !exists) { + state.Close(); + state.IncrementFilename(); + WPI_INFO(m_msglog, "Log file deleted, recreating as fresh log '{}'", + state.filename); + doStart = true; + } + } + + // start new file if file exceeds 1.8 GB + if (written > 1800000000ull) { + state.Close(); + state.IncrementFilename(); + WPI_INFO(m_msglog, "Log file reached 1.8 GB, starting new file '{}'", + state.filename); + doStart = true; + } + + if (m_state == kStart || doStart) { + lock.unlock(); + StartLogFile(state); + lock.lock(); + if (state.f != fs::kInvalidFile) { + // Emit start and schema data records + for (auto&& entryInfo : m_entries) { + AppendStartRecord(entryInfo.second.id, entryInfo.first(), + entryInfo.second.type, + m_entryIds[entryInfo.second.id].metadata, 0); + if (!entryInfo.second.schemaData.empty()) { + StartRecord(entryInfo.second.id, 0, + entryInfo.second.schemaData.size(), 0); + AppendImpl(entryInfo.second.schemaData); + } + } + } + m_state = kActive; + written = 0; + } + + if (!m_newFilename.empty() && state.f != fs::kInvalidFile) { auto newFilename = std::move(m_newFilename); m_newFilename.clear(); - lock.unlock(); // rename - if (filename != newFilename) { - fs::rename(dirPath / filename, dirPath / newFilename, ec); + if (state.filename != newFilename) { + lock.unlock(); + fs::rename(state.path, state.dirPath / newFilename, ec); + lock.lock(); } if (ec) { WPI_ERROR(m_msglog, "Could not rename log file from '{}' to '{}': {}", - filename, newFilename, ec.message()); + state.filename, newFilename, ec.message()); } else { - WPI_INFO(m_msglog, "Renamed log file from '{}' to '{}'", filename, + WPI_INFO(m_msglog, "Renamed log file from '{}' to '{}'", state.filename, newFilename); } - filename = std::move(newFilename); - lock.lock(); + state.SetFilename(newFilename); } if (doFlush || m_doFlush) { @@ -393,39 +506,45 @@ void DataLog::WriterThreadMain(std::string_view dir) { // swap outgoing with empty vector toWrite.swap(m_outgoing); - if (f != fs::kInvalidFile && !blocked) { + if (state.f != fs::kInvalidFile && !blocked) { lock.unlock(); // update free space every 10 flushes (in case other things are writing) if (++freeSpaceCount >= 10) { freeSpaceCount = 0; - freeSpace = fs::space(dirPath).available; + auto freeSpaceInfo = fs::space(state.dirPath, ec); + if (!ec) { + state.freeSpace = freeSpaceInfo.available; + } else { + state.freeSpace = UINTMAX_MAX; + } } // write buffers to file for (auto&& buf : toWrite) { // stop writing when we go below the minimum free space - freeSpace -= buf.GetData().size(); - if (freeSpace < kMinFreeSpace) { + state.freeSpace -= buf.GetData().size(); + written += buf.GetData().size(); + if (state.freeSpace < kMinFreeSpace) { [[unlikely]] WPI_ERROR( m_msglog, "Stopped logging due to low free space ({} available)", - FormatBytesSize(freeSpace)); + FormatBytesSize(state.freeSpace)); blocked = true; break; } - WriteToFile(f, buf.GetData(), filename, m_msglog); + WriteToFile(state.f, buf.GetData(), state.filename, m_msglog); } // sync to storage #if defined(__linux__) - ::fdatasync(f); + ::fdatasync(state.f); #elif defined(__APPLE__) - ::fsync(f); + ::fsync(state.f); #endif lock.lock(); if (blocked) { - [[unlikely]] m_paused = true; + [[unlikely]] m_state = kPaused; } } @@ -439,10 +558,6 @@ void DataLog::WriterThreadMain(std::string_view dir) { toWrite.resize(0); } } - - if (f != fs::kInvalidFile) { - fs::CloseFile(f); - } } void DataLog::WriterThreadMain( @@ -465,7 +580,7 @@ void DataLog::WriterThreadMain( std::vector toWrite; std::unique_lock lock{m_mutex}; - while (m_active) { + while (m_state != kShutdown) { bool doFlush = false; auto timeoutTime = std::chrono::steady_clock::now() + periodTime; if (m_cond.wait_until(lock, timeoutTime) == std::cv_status::timeout) { @@ -521,9 +636,9 @@ int DataLog::StartImpl(std::string_view name, std::string_view type, if (entryInfo.id == 0) { entryInfo.id = ++m_lastId; } - auto& savedCount = m_entryCounts[entryInfo.id]; - ++savedCount; - if (savedCount > 1) { + auto& entryInfo2 = m_entryIds[entryInfo.id]; + ++entryInfo2.count; + if (entryInfo2.count > 1) { if (entryInfo.type != type) { WPI_ERROR(m_msglog, "type mismatch for '{}': was '{}', requested '{}'; ignoring", @@ -533,15 +648,26 @@ int DataLog::StartImpl(std::string_view name, std::string_view type, return entryInfo.id; } entryInfo.type = type; + entryInfo2.metadata = metadata; + + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return entryInfo.id; + } + + AppendStartRecord(entryInfo.id, name, type, metadata, timestamp); + return entryInfo.id; +} + +void DataLog::AppendStartRecord(int id, std::string_view name, + std::string_view type, + std::string_view metadata, int64_t timestamp) { size_t strsize = name.size() + type.size() + metadata.size(); uint8_t* buf = StartRecord(0, timestamp, 5 + 12 + strsize, 5); *buf++ = impl::kControlStart; - wpi::support::endian::write32le(buf, entryInfo.id); + wpi::support::endian::write32le(buf, id); AppendStringImpl(name); AppendStringImpl(type); AppendStringImpl(metadata); - - return entryInfo.id; } void DataLog::Finish(int entry, int64_t timestamp) { @@ -549,15 +675,18 @@ void DataLog::Finish(int entry, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - auto& savedCount = m_entryCounts[entry]; - if (savedCount == 0) { + auto& entryInfo2 = m_entryIds[entry]; + if (entryInfo2.count == 0) { return; } - --savedCount; - if (savedCount != 0) { + --entryInfo2.count; + if (entryInfo2.count != 0) { return; } - m_entryCounts.erase(entry); + m_entryIds.erase(entry); + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } uint8_t* buf = StartRecord(0, timestamp, 5, 5); *buf++ = impl::kControlFinish; wpi::support::endian::write32le(buf, entry); @@ -569,6 +698,10 @@ void DataLog::SetMetadata(int entry, std::string_view metadata, return; } std::scoped_lock lock{m_mutex}; + m_entryIds[entry].metadata = metadata; + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } uint8_t* buf = StartRecord(0, timestamp, 5 + 4 + metadata.size(), 5); *buf++ = impl::kControlSetMetadata; wpi::support::endian::write32le(buf, entry); @@ -584,7 +717,7 @@ uint8_t* DataLog::Reserve(size_t size) { m_msglog, "outgoing buffers exceeded threshold, pausing logging--" "consider flushing to disk more frequently (smaller period)"); - m_paused = true; + m_state = kPaused; } m_outgoing.emplace_back(); } else { @@ -626,8 +759,8 @@ void DataLog::AppendRaw(int entry, std::span data, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, data.size(), 0); AppendImpl(data); @@ -640,8 +773,8 @@ void DataLog::AppendRaw2(int entry, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } size_t size = 0; for (auto&& chunk : data) { @@ -658,8 +791,8 @@ void DataLog::AppendBoolean(int entry, bool value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 1, 1); buf[0] = value ? 1 : 0; @@ -670,8 +803,8 @@ void DataLog::AppendInteger(int entry, int64_t value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 8, 8); wpi::support::endian::write64le(buf, value); @@ -682,8 +815,8 @@ void DataLog::AppendFloat(int entry, float value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 4, 4); if constexpr (wpi::support::endian::system_endianness() == @@ -699,8 +832,8 @@ void DataLog::AppendDouble(int entry, double value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 8, 8); if constexpr (wpi::support::endian::system_endianness() == @@ -724,8 +857,8 @@ void DataLog::AppendBooleanArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size(), 0); uint8_t* buf; @@ -748,8 +881,8 @@ void DataLog::AppendBooleanArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size(), 0); uint8_t* buf; @@ -783,8 +916,8 @@ void DataLog::AppendIntegerArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 8, 0); uint8_t* buf; @@ -816,8 +949,8 @@ void DataLog::AppendFloatArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 4, 0); uint8_t* buf; @@ -849,8 +982,8 @@ void DataLog::AppendDoubleArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 8, 0); uint8_t* buf; @@ -882,8 +1015,8 @@ void DataLog::AppendStringArray(int entry, std::span arr, size += 4 + str.size(); } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -905,8 +1038,8 @@ void DataLog::AppendStringArray(int entry, size += 4 + str.size(); } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -928,8 +1061,8 @@ void DataLog::AppendStringArray(int entry, size += 4 + str.len; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -975,6 +1108,10 @@ void WPI_DataLog_Resume(struct WPI_DataLog* datalog) { reinterpret_cast(datalog)->Resume(); } +void WPI_DataLog_Stop(struct WPI_DataLog* datalog) { + reinterpret_cast(datalog)->Stop(); +} + int WPI_DataLog_Start(struct WPI_DataLog* datalog, const char* name, const char* type, const char* metadata, int64_t timestamp) { diff --git a/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp b/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp index cbc233dc50..c78c891ea5 100644 --- a/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp +++ b/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp @@ -111,6 +111,22 @@ Java_edu_wpi_first_util_datalog_DataLogJNI_resume reinterpret_cast(impl)->Resume(); } +/* + * Class: edu_wpi_first_util_datalog_DataLogJNI + * Method: stop + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_edu_wpi_first_util_datalog_DataLogJNI_stop + (JNIEnv* env, jclass, jlong impl) +{ + if (impl == 0) { + wpi::ThrowNullPointerException(env, "impl is null"); + return; + } + reinterpret_cast(impl)->Stop(); +} + /* * Class: edu_wpi_first_util_datalog_DataLogJNI * Method: addSchema diff --git a/wpiutil/src/main/native/include/wpi/DataLog.h b/wpiutil/src/main/native/include/wpi/DataLog.h index 9e53bdcfa6..99db964628 100644 --- a/wpiutil/src/main/native/include/wpi/DataLog.h +++ b/wpiutil/src/main/native/include/wpi/DataLog.h @@ -174,10 +174,18 @@ class DataLog final { void Pause(); /** - * Resumes appending of data records to the log. + * Resumes appending of data records to the log. If called after Stop(), + * opens a new file (with random name if SetFilename was not called after + * Stop()) and appends Start records and schema data values for all previously + * started entries and schemas. */ void Resume(); + /** + * Stops appending all records to the log, and closes the log file. + */ + void Stop(); + /** * Returns whether there is a data schema already registered with the given * name. @@ -456,6 +464,9 @@ class DataLog final { int64_t timestamp); private: + struct WriterThreadState; + + void StartLogFile(WriterThreadState& state); void WriterThreadMain(std::string_view dir); void WriterThreadMain( std::function data)> write); @@ -468,13 +479,20 @@ class DataLog final { uint8_t* Reserve(size_t size); void AppendImpl(std::span data); void AppendStringImpl(std::string_view str); + void AppendStartRecord(int id, std::string_view name, std::string_view type, + std::string_view metadata, int64_t timestamp); wpi::Logger& m_msglog; mutable wpi::mutex m_mutex; wpi::condition_variable m_cond; - bool m_active{true}; bool m_doFlush{false}; - bool m_paused{false}; + enum State { + kStart, + kActive, + kPaused, + kStopped, + kShutdown, + } m_state = kActive; double m_period; std::string m_extraHeader; std::string m_newFilename; @@ -483,10 +501,15 @@ class DataLog final { std::vector m_outgoing; struct EntryInfo { std::string type; + std::vector schemaData; // only set for schema entries int id{0}; }; wpi::StringMap m_entries; - wpi::DenseMap m_entryCounts; + struct EntryInfo2 { + std::string metadata; + unsigned int count; + }; + wpi::DenseMap m_entryIds; int m_lastId = 0; std::thread m_thread; }; @@ -1113,12 +1136,22 @@ void WPI_DataLog_Flush(struct WPI_DataLog* datalog); void WPI_DataLog_Pause(struct WPI_DataLog* datalog); /** - * Resumes appending of data records to the log. + * Resumes appending of data records to the log. If called after Stop(), + * opens a new file (with random name if SetFilename was not called after + * Stop()) and appends Start records and schema data values for all previously + * started entries and schemas. * * @param datalog data log */ void WPI_DataLog_Resume(struct WPI_DataLog* datalog); +/** + * Stops appending all records to the log, and closes the log file. + * + * @param datalog data log + */ +void WPI_DataLog_Stop(struct WPI_DataLog* datalog); + /** * Start an entry. Duplicate names are allowed (with the same type), and * result in the same index being returned (Start/Finish are reference