#! /usr/bin/env python3 # Copyright (c) FIRST and other WPILib contributors. # Open Source Software; you can modify and/or share it under the terms of # the WPILib BSD license file in the root directory of this project. import array import struct from typing import List, SupportsBytes __all__ = ["StartRecordData", "MetadataRecordData", "DataLogRecord", "DataLogReader"] floatStruct = struct.Struct(" bool: return self.entry == 0 def _getControlType(self) -> int: return self.data[0] def isStart(self) -> bool: return ( self.entry == 0 and len(self.data) >= 17 and self._getControlType() == kControlStart ) def isFinish(self) -> bool: return ( self.entry == 0 and len(self.data) == 5 and self._getControlType() == kControlFinish ) def isSetMetadata(self) -> bool: return ( self.entry == 0 and len(self.data) >= 9 and self._getControlType() == kControlSetMetadata ) def getStartData(self) -> StartRecordData: if not self.isStart(): raise TypeError("not a start record") entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) name, pos = self._readInnerString(5) type, pos = self._readInnerString(pos) metadata = self._readInnerString(pos)[0] return StartRecordData(entry, name, type, metadata) def getFinishEntry(self) -> int: if not self.isFinish(): raise TypeError("not a finish record") return int.from_bytes(self.data[1:5], byteorder="little", signed=False) def getSetMetadataData(self) -> MetadataRecordData: if not self.isSetMetadata(): raise TypeError("not a finish record") entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) metadata = self._readInnerString(5)[0] return MetadataRecordData(entry, metadata) def getBoolean(self) -> bool: if len(self.data) != 1: raise TypeError("not a boolean") return self.data[0] != 0 def getInteger(self) -> int: if len(self.data) != 8: raise TypeError("not an integer") return int.from_bytes(self.data, byteorder="little", signed=True) def getFloat(self) -> float: if len(self.data) != 4: raise TypeError("not a float") return floatStruct.unpack(self.data)[0] def getDouble(self) -> float: if len(self.data) != 8: raise TypeError("not a double") return doubleStruct.unpack(self.data)[0] def getString(self) -> str: return str(self.data, encoding="utf-8") def getBooleanArray(self) -> List[bool]: return [x != 0 for x in self.data] def getIntegerArray(self) -> array.array: if (len(self.data) % 8) != 0: raise TypeError("not an integer array") arr = array.array("l") arr.frombytes(self.data) return arr def getFloatArray(self) -> array.array: if (len(self.data) % 4) != 0: raise TypeError("not a float array") arr = array.array("f") arr.frombytes(self.data) return arr def getDoubleArray(self) -> array.array: if (len(self.data) % 8) != 0: raise TypeError("not a double array") arr = array.array("d") arr.frombytes(self.data) return arr def getStringArray(self) -> List[str]: size = int.from_bytes(self.data[:4], byteorder="little", signed=False) if size > ((len(self.data) - 4) / 4): raise TypeError("not a string array") arr = [] pos = 4 for _ in range(size): val, pos = self._readInnerString(pos) arr.append(val) return arr def _readInnerString(self, pos: int) -> tuple[str, int]: size = int.from_bytes( self.data[pos : pos + 4], byteorder="little", signed=False ) end = pos + 4 + size if end > len(self.data): raise TypeError("invalid string size") return str(self.data[pos + 4 : end], encoding="utf-8"), end class DataLogIterator: """DataLogReader iterator.""" def __init__(self, buf: SupportsBytes, pos: int): self.buf = buf self.pos = pos def __iter__(self): return self def _readVarInt(self, pos: int, len: int) -> int: val = 0 for i in range(len): val |= self.buf[pos + i] << (i * 8) return val def __next__(self) -> DataLogRecord: if len(self.buf) < (self.pos + 4): raise StopIteration entryLen = (self.buf[self.pos] & 0x3) + 1 sizeLen = ((self.buf[self.pos] >> 2) & 0x3) + 1 timestampLen = ((self.buf[self.pos] >> 4) & 0x7) + 1 headerLen = 1 + entryLen + sizeLen + timestampLen if len(self.buf) < (self.pos + headerLen): raise StopIteration entry = self._readVarInt(self.pos + 1, entryLen) size = self._readVarInt(self.pos + 1 + entryLen, sizeLen) timestamp = self._readVarInt(self.pos + 1 + entryLen + sizeLen, timestampLen) if len(self.buf) < (self.pos + headerLen + size): raise StopIteration record = DataLogRecord( entry, timestamp, self.buf[self.pos + headerLen : self.pos + headerLen + size], ) self.pos += headerLen + size return record class DataLogReader: """Data log reader (reads logs written by the DataLog class).""" def __init__(self, buf: SupportsBytes): self.buf = buf def __bool__(self): return self.isValid() def isValid(self) -> bool: """Returns true if the data log is valid (e.g. has a valid header).""" return ( len(self.buf) >= 12 and self.buf[:6] == b"WPILOG" and self.getVersion() >= 0x0100 ) def getVersion(self) -> int: """Gets the data log version. Returns 0 if data log is invalid. @return Version number; most significant byte is major, least significant is minor (so version 1.0 will be 0x0100)""" if len(self.buf) < 12: return 0 return int.from_bytes(self.buf[6:8], byteorder="little", signed=False) def getExtraHeader(self) -> str: """Gets the extra header data. @return Extra header data """ if len(self.buf) < 12: return "" size = int.from_bytes(self.buf[8:12], byteorder="little", signed=False) return str(self.buf[12 : 12 + size], encoding="utf-8") def __iter__(self) -> DataLogIterator: extraHeaderSize = int.from_bytes( self.buf[8:12], byteorder="little", signed=False ) return DataLogIterator(self.buf, 12 + extraHeaderSize) if __name__ == "__main__": from datetime import datetime import mmap import sys if len(sys.argv) != 2: print("Usage: datalog.py ", file=sys.stderr) sys.exit(1) with open(sys.argv[1], "r") as f: mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) reader = DataLogReader(mm) if not reader: print("not a log file", file=sys.stderr) sys.exit(1) entries = {} for record in reader: timestamp = record.timestamp / 1000000 if record.isStart(): try: data = record.getStartData() print( f"Start({data.entry}, name='{data.name}', type='{data.type}', metadata='{data.metadata}') [{timestamp}]" ) if data.entry in entries: print("...DUPLICATE entry ID, overriding") entries[data.entry] = data except TypeError as e: print("Start(INVALID)") elif record.isFinish(): try: entry = record.getFinishEntry() print(f"Finish({entry}) [{timestamp}]") if entry not in entries: print("...ID not found") else: del entries[entry] except TypeError as e: print("Finish(INVALID)") elif record.isSetMetadata(): try: data = record.getSetMetadataData() print(f"SetMetadata({data.entry}, '{data.metadata}') [{timestamp}]") if data.entry not in entries: print("...ID not found") except TypeError as e: print("SetMetadata(INVALID)") elif record.isControl(): print("Unrecognized control record") else: print(f"Data({record.entry}, size={len(record.data)}) ", end="") entry = entries.get(record.entry) if entry is None: print("") continue print(f" [{timestamp}]") try: # handle systemTime specially if entry.name == "systemTime" and entry.type == "int64": dt = datetime.fromtimestamp(record.getInteger() / 1000000) print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt)) continue if entry.type == "double": print(f" {record.getDouble()}") elif entry.type == "int64": print(f" {record.getInteger()}") elif entry.type in ("string", "json"): print(f" '{record.getString()}'") elif entry.type == "boolean": print(f" {record.getBoolean()}") elif entry.type == "boolean[]": arr = record.getBooleanArray() print(f" {arr}") elif entry.type == "double[]": arr = record.getDoubleArray() print(f" {arr}") elif entry.type == "float[]": arr = record.getFloatArray() print(f" {arr}") elif entry.type == "int64[]": arr = record.getIntegerArray() print(f" {arr}") elif entry.type == "string[]": arr = record.getStringArray() print(f" {arr}") except TypeError as e: print(" invalid")