mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-20 00:51:42 +00:00
354 lines
12 KiB
Python
Executable File
354 lines
12 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
# Copyright (c) FIRST and other WPILib contributors.
|
|
# Open Source Software; you can modify and/or share it under the terms of
|
|
# the WPILib BSD license file in the root directory of this project.
|
|
|
|
import array
|
|
import struct
|
|
import msgpack
|
|
from typing import List, SupportsBytes
|
|
|
|
__all__ = ["StartRecordData", "MetadataRecordData", "DataLogRecord", "DataLogReader"]
|
|
|
|
floatStruct = struct.Struct("<f")
|
|
doubleStruct = struct.Struct("<d")
|
|
|
|
kControlStart = 0
|
|
kControlFinish = 1
|
|
kControlSetMetadata = 2
|
|
|
|
|
|
class StartRecordData:
|
|
"""Data contained in a start control record as created by DataLog.start() when
|
|
writing the log. This can be read by calling DataLogRecord.getStartData().
|
|
|
|
entry: Entry ID; this will be used for this entry in future records.
|
|
name: Entry name.
|
|
type: Type of the stored data for this entry, as a string, e.g. "double".
|
|
metadata: Initial metadata.
|
|
"""
|
|
|
|
def __init__(self, entry: int, name: str, type: str, metadata: str):
|
|
self.entry = entry
|
|
self.name = name
|
|
self.type = type
|
|
self.metadata = metadata
|
|
|
|
|
|
class MetadataRecordData:
|
|
"""Data contained in a set metadata control record as created by
|
|
DataLog.setMetadata(). This can be read by calling
|
|
DataLogRecord.getSetMetadataData().
|
|
|
|
entry: Entry ID.
|
|
metadata: New metadata for the entry.
|
|
"""
|
|
|
|
def __init__(self, entry: int, metadata: str):
|
|
self.entry = entry
|
|
self.metadata = metadata
|
|
|
|
|
|
class DataLogRecord:
|
|
"""A record in the data log. May represent either a control record
|
|
(entry == 0) or a data record."""
|
|
|
|
def __init__(self, entry: int, timestamp: int, data: SupportsBytes):
|
|
self.entry = entry
|
|
self.timestamp = timestamp
|
|
self.data = data
|
|
|
|
def isControl(self) -> bool:
|
|
return self.entry == 0
|
|
|
|
def _getControlType(self) -> int:
|
|
return self.data[0]
|
|
|
|
def isStart(self) -> bool:
|
|
return (
|
|
self.entry == 0
|
|
and len(self.data) >= 17
|
|
and self._getControlType() == kControlStart
|
|
)
|
|
|
|
def isFinish(self) -> bool:
|
|
return (
|
|
self.entry == 0
|
|
and len(self.data) == 5
|
|
and self._getControlType() == kControlFinish
|
|
)
|
|
|
|
def isSetMetadata(self) -> bool:
|
|
return (
|
|
self.entry == 0
|
|
and len(self.data) >= 9
|
|
and self._getControlType() == kControlSetMetadata
|
|
)
|
|
|
|
def getStartData(self) -> StartRecordData:
|
|
if not self.isStart():
|
|
raise TypeError("not a start record")
|
|
entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False)
|
|
name, pos = self._readInnerString(5)
|
|
type, pos = self._readInnerString(pos)
|
|
metadata = self._readInnerString(pos)[0]
|
|
return StartRecordData(entry, name, type, metadata)
|
|
|
|
def getFinishEntry(self) -> int:
|
|
if not self.isFinish():
|
|
raise TypeError("not a finish record")
|
|
return int.from_bytes(self.data[1:5], byteorder="little", signed=False)
|
|
|
|
def getSetMetadataData(self) -> MetadataRecordData:
|
|
if not self.isSetMetadata():
|
|
raise TypeError("not a finish record")
|
|
entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False)
|
|
metadata = self._readInnerString(5)[0]
|
|
return MetadataRecordData(entry, metadata)
|
|
|
|
def getBoolean(self) -> bool:
|
|
if len(self.data) != 1:
|
|
raise TypeError("not a boolean")
|
|
return self.data[0] != 0
|
|
|
|
def getInteger(self) -> int:
|
|
if len(self.data) != 8:
|
|
raise TypeError("not an integer")
|
|
return int.from_bytes(self.data, byteorder="little", signed=True)
|
|
|
|
def getFloat(self) -> float:
|
|
if len(self.data) != 4:
|
|
raise TypeError("not a float")
|
|
return floatStruct.unpack(self.data)[0]
|
|
|
|
def getDouble(self) -> float:
|
|
if len(self.data) != 8:
|
|
raise TypeError("not a double")
|
|
return doubleStruct.unpack(self.data)[0]
|
|
|
|
def getString(self) -> str:
|
|
return str(self.data, encoding="utf-8")
|
|
|
|
def getMsgPack(self):
|
|
return msgpack.unpackb(self.data)
|
|
|
|
def getBooleanArray(self) -> List[bool]:
|
|
return [x != 0 for x in self.data]
|
|
|
|
def getIntegerArray(self) -> array.array:
|
|
if (len(self.data) % 8) != 0:
|
|
raise TypeError("not an integer array")
|
|
arr = array.array("l")
|
|
arr.frombytes(self.data)
|
|
return arr
|
|
|
|
def getFloatArray(self) -> array.array:
|
|
if (len(self.data) % 4) != 0:
|
|
raise TypeError("not a float array")
|
|
arr = array.array("f")
|
|
arr.frombytes(self.data)
|
|
return arr
|
|
|
|
def getDoubleArray(self) -> array.array:
|
|
if (len(self.data) % 8) != 0:
|
|
raise TypeError("not a double array")
|
|
arr = array.array("d")
|
|
arr.frombytes(self.data)
|
|
return arr
|
|
|
|
def getStringArray(self) -> List[str]:
|
|
size = int.from_bytes(self.data[:4], byteorder="little", signed=False)
|
|
if size > ((len(self.data) - 4) / 4):
|
|
raise TypeError("not a string array")
|
|
arr = []
|
|
pos = 4
|
|
for _ in range(size):
|
|
val, pos = self._readInnerString(pos)
|
|
arr.append(val)
|
|
return arr
|
|
|
|
def _readInnerString(self, pos: int) -> tuple[str, int]:
|
|
size = int.from_bytes(
|
|
self.data[pos : pos + 4], byteorder="little", signed=False
|
|
)
|
|
end = pos + 4 + size
|
|
if end > len(self.data):
|
|
raise TypeError("invalid string size")
|
|
return str(self.data[pos + 4 : end], encoding="utf-8"), end
|
|
|
|
|
|
class DataLogIterator:
|
|
"""DataLogReader iterator."""
|
|
|
|
def __init__(self, buf: SupportsBytes, pos: int):
|
|
self.buf = buf
|
|
self.pos = pos
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def _readVarInt(self, pos: int, len: int) -> int:
|
|
val = 0
|
|
for i in range(len):
|
|
val |= self.buf[pos + i] << (i * 8)
|
|
return val
|
|
|
|
def __next__(self) -> DataLogRecord:
|
|
if len(self.buf) < (self.pos + 4):
|
|
raise StopIteration
|
|
entryLen = (self.buf[self.pos] & 0x3) + 1
|
|
sizeLen = ((self.buf[self.pos] >> 2) & 0x3) + 1
|
|
timestampLen = ((self.buf[self.pos] >> 4) & 0x7) + 1
|
|
headerLen = 1 + entryLen + sizeLen + timestampLen
|
|
if len(self.buf) < (self.pos + headerLen):
|
|
raise StopIteration
|
|
entry = self._readVarInt(self.pos + 1, entryLen)
|
|
size = self._readVarInt(self.pos + 1 + entryLen, sizeLen)
|
|
timestamp = self._readVarInt(self.pos + 1 + entryLen + sizeLen, timestampLen)
|
|
if len(self.buf) < (self.pos + headerLen + size):
|
|
raise StopIteration
|
|
record = DataLogRecord(
|
|
entry,
|
|
timestamp,
|
|
self.buf[self.pos + headerLen : self.pos + headerLen + size],
|
|
)
|
|
self.pos += headerLen + size
|
|
return record
|
|
|
|
|
|
class DataLogReader:
|
|
"""Data log reader (reads logs written by the DataLog class)."""
|
|
|
|
def __init__(self, buf: SupportsBytes):
|
|
self.buf = buf
|
|
|
|
def __bool__(self):
|
|
return self.isValid()
|
|
|
|
def isValid(self) -> bool:
|
|
"""Returns true if the data log is valid (e.g. has a valid header)."""
|
|
return (
|
|
len(self.buf) >= 12
|
|
and self.buf[:6] == b"WPILOG"
|
|
and self.getVersion() >= 0x0100
|
|
)
|
|
|
|
def getVersion(self) -> int:
|
|
"""Gets the data log version. Returns 0 if data log is invalid.
|
|
|
|
@return Version number; most significant byte is major, least significant is
|
|
minor (so version 1.0 will be 0x0100)"""
|
|
if len(self.buf) < 12:
|
|
return 0
|
|
return int.from_bytes(self.buf[6:8], byteorder="little", signed=False)
|
|
|
|
def getExtraHeader(self) -> str:
|
|
"""Gets the extra header data.
|
|
|
|
@return Extra header data
|
|
"""
|
|
if len(self.buf) < 12:
|
|
return ""
|
|
size = int.from_bytes(self.buf[8:12], byteorder="little", signed=False)
|
|
return str(self.buf[12 : 12 + size], encoding="utf-8")
|
|
|
|
def __iter__(self) -> DataLogIterator:
|
|
extraHeaderSize = int.from_bytes(
|
|
self.buf[8:12], byteorder="little", signed=False
|
|
)
|
|
return DataLogIterator(self.buf, 12 + extraHeaderSize)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from datetime import datetime
|
|
import mmap
|
|
import sys
|
|
|
|
if len(sys.argv) != 2:
|
|
print("Usage: datalog.py <file>", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
with open(sys.argv[1], "r") as f:
|
|
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
|
reader = DataLogReader(mm)
|
|
if not reader:
|
|
print("not a log file", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
entries = {}
|
|
for record in reader:
|
|
timestamp = record.timestamp / 1000000
|
|
if record.isStart():
|
|
try:
|
|
data = record.getStartData()
|
|
print(
|
|
f"Start({data.entry}, name='{data.name}', type='{data.type}', metadata='{data.metadata}') [{timestamp}]"
|
|
)
|
|
if data.entry in entries:
|
|
print("...DUPLICATE entry ID, overriding")
|
|
entries[data.entry] = data
|
|
except TypeError as e:
|
|
print("Start(INVALID)")
|
|
elif record.isFinish():
|
|
try:
|
|
entry = record.getFinishEntry()
|
|
print(f"Finish({entry}) [{timestamp}]")
|
|
if entry not in entries:
|
|
print("...ID not found")
|
|
else:
|
|
del entries[entry]
|
|
except TypeError as e:
|
|
print("Finish(INVALID)")
|
|
elif record.isSetMetadata():
|
|
try:
|
|
data = record.getSetMetadataData()
|
|
print(f"SetMetadata({data.entry}, '{data.metadata}') [{timestamp}]")
|
|
if data.entry not in entries:
|
|
print("...ID not found")
|
|
except TypeError as e:
|
|
print("SetMetadata(INVALID)")
|
|
elif record.isControl():
|
|
print("Unrecognized control record")
|
|
else:
|
|
print(f"Data({record.entry}, size={len(record.data)}) ", end="")
|
|
entry = entries.get(record.entry)
|
|
if entry is None:
|
|
print("<ID not found>")
|
|
continue
|
|
print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]")
|
|
|
|
try:
|
|
# handle systemTime specially
|
|
if entry.name == "systemTime" and entry.type == "int64":
|
|
dt = datetime.fromtimestamp(record.getInteger() / 1000000)
|
|
print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt))
|
|
continue
|
|
|
|
if entry.type == "double":
|
|
print(f" {record.getDouble()}")
|
|
elif entry.type == "int64":
|
|
print(f" {record.getInteger()}")
|
|
elif entry.type in ("string", "json"):
|
|
print(f" '{record.getString()}'")
|
|
elif entry.type == "msgpack":
|
|
print(f" '{record.getMsgPack()}'")
|
|
elif entry.type == "boolean":
|
|
print(f" {record.getBoolean()}")
|
|
elif entry.type == "boolean[]":
|
|
arr = record.getBooleanArray()
|
|
print(f" {arr}")
|
|
elif entry.type == "double[]":
|
|
arr = record.getDoubleArray()
|
|
print(f" {arr}")
|
|
elif entry.type == "float[]":
|
|
arr = record.getFloatArray()
|
|
print(f" {arr}")
|
|
elif entry.type == "int64[]":
|
|
arr = record.getIntegerArray()
|
|
print(f" {arr}")
|
|
elif entry.type == "string[]":
|
|
arr = record.getStringArray()
|
|
print(f" {arr}")
|
|
except TypeError as e:
|
|
print(" invalid")
|