[hal] Various CAN fixes (#8043)

This commit is contained in:
Thad House
2025-07-14 23:46:57 -07:00
committed by GitHub
parent 3497a7d09f
commit 8d248f61fd
9 changed files with 187 additions and 58 deletions

View File

@@ -18,6 +18,7 @@
#include <vector>
#include <wpi/DenseMap.h>
#include <wpi/circular_buffer.h>
#include <wpi/mutex.h>
#include <wpi/print.h>
#include <wpi/timestamp.h>
@@ -25,10 +26,13 @@
#include "PortsInternal.h"
#include "hal/Errors.h"
#include "hal/Threads.h"
#include "hal/handles/UnlimitedHandleResource.h"
#include "wpinet/EventLoopRunner.h"
#include "wpinet/uv/Poll.h"
#include "wpinet/uv/Timer.h"
using namespace hal;
namespace {
static constexpr uint32_t MatchingBitMask = CAN_EFF_MASK | CAN_RTR_FLAG;
@@ -60,9 +64,23 @@ uint32_t MapSocketCanToMessageId(uint32_t id) {
return toRet;
}
struct FrameStore {
canfd_frame frame;
uint64_t timestamp{0};
struct CANStreamStorage {
CANStreamStorage(uint32_t maxMessages, uint8_t busId, uint32_t mask,
uint32_t filter)
: receivedMessages{maxMessages},
allowedMessages{maxMessages},
canBusId{busId},
canMask{mask},
canFilter{filter} {}
wpi::circular_buffer<struct HAL_CANStreamMessage> receivedMessages;
bool didOverflow{false};
uint32_t allowedMessages;
uint8_t canBusId;
uint32_t canMask;
uint32_t canFilter;
void CheckFrame(const HAL_CANStreamMessage& message);
};
struct SocketCanState {
@@ -82,7 +100,8 @@ struct SocketCanState {
wpi::mutex readMutex[hal::kNumCanBuses];
// TODO(thadhouse) we need a MUCH better way of doing this masking
wpi::DenseMap<uint32_t, FrameStore> readFrames[hal::kNumCanBuses];
wpi::DenseMap<uint32_t, HAL_CANStreamMessage> readFrames[hal::kNumCanBuses];
std::vector<CANStreamStorage*> canStreams[hal::kNumCanBuses];
bool InitializeBuses();
@@ -95,14 +114,36 @@ struct SocketCanState {
} // namespace
static UnlimitedHandleResource<HAL_CANStreamHandle, CANStreamStorage,
HAL_HandleEnum::CANStream>* canStreamHandles;
static SocketCanState* canState;
namespace hal::init {
void InitializeCAN() {
canState = new SocketCanState{};
static UnlimitedHandleResource<HAL_CANStreamHandle, CANStreamStorage,
HAL_HandleEnum::CANStream>
cSH;
canStreamHandles = &cSH;
}
} // namespace hal::init
void CANStreamStorage::CheckFrame(const HAL_CANStreamMessage& message) {
if ((message.messageId & canMask) != canFilter) {
return;
}
// We already are holding the lock here.
if (receivedMessages.size() == allowedMessages) {
didOverflow = true;
return;
}
receivedMessages.push_back(message);
}
bool SocketCanState::InitializeBuses() {
bool success = true;
readLoopRunner.ExecSync([this, &success](wpi::uv::Loop& loop) {
@@ -152,33 +193,51 @@ bool SocketCanState::InitializeBuses() {
return;
}
poll->pollEvent.connect(
[this, fd = socketHandle[i], canIndex = i](int mask) {
if (mask & UV_READABLE) {
canfd_frame frame;
int rVal = read(fd, &frame, sizeof(frame));
if (rVal <= 0) {
// TODO(thadhouse) error handling
return;
}
if (frame.can_id & CAN_ERR_FLAG) {
// Do nothing if this is an error frame
return;
}
poll->pollEvent.connect([this, fd = socketHandle[i],
canIndex = i](int mask) {
if (mask & UV_READABLE) {
canfd_frame frame;
int rVal = read(fd, &frame, sizeof(frame));
if (rVal <= 0) {
// TODO(thadhouse) error handling
return;
}
if (frame.can_id & CAN_ERR_FLAG) {
// Do nothing if this is an error frame
return;
}
uint32_t messageId = MapSocketCanToMessageId(frame.can_id);
uint64_t timestamp = wpi::Now();
// Ensure FDF flag is set for the read later.
if (rVal == CANFD_MTU) {
frame.flags = CANFD_FDF;
}
uint32_t messageId = MapSocketCanToMessageId(frame.can_id);
uint64_t timestamp = wpi::Now();
// Ensure FDF flag is set for the read later.
if (rVal == CANFD_MTU) {
frame.flags = CANFD_FDF;
}
std::scoped_lock lock{readMutex[canIndex]};
auto& msg = readFrames[canIndex][messageId];
msg.frame = frame;
msg.timestamp = timestamp;
}
});
std::scoped_lock lock{readMutex[canIndex]};
auto& msg = readFrames[canIndex][messageId];
msg.messageId = messageId;
msg.message.timeStamp = timestamp;
msg.message.message.flags = HAL_CANFlags::HAL_CAN_NO_FLAGS;
msg.message.message.flags |= (frame.flags & CANFD_FDF)
? HAL_CANFlags::HAL_CAN_FD_DATALENGTH
: HAL_CANFlags::HAL_CAN_NO_FLAGS;
msg.message.message.flags |=
(frame.flags & CANFD_BRS) ? HAL_CANFlags::HAL_CAN_FD_BITRATESWITCH
: HAL_CANFlags::HAL_CAN_NO_FLAGS;
msg.message.message.dataSize = frame.len;
if (frame.len > 0) {
std::memcpy(msg.message.message.data, frame.data, frame.len);
}
for (auto&& stream : canStreams[canIndex]) {
stream->CheckFrame(msg);
}
}
});
poll->Start(UV_READABLE);
}
@@ -287,7 +346,20 @@ void HAL_CAN_SendMessage(int32_t busId, uint32_t messageId,
std::scoped_lock lock{canState->writeMutex[busId]};
int result = send(canState->socketHandle[busId], &frame, mtu, 0);
if (result != mtu) {
// TODO(thadhouse) better error
if (result == -1) {
int err = errno;
if (err == ENOBUFS) {
*status = HAL_WARN_CANSessionMux_TxQueueFull;
return;
} else if (err == EAGAIN || err == EWOULDBLOCK) {
*status = HAL_WARN_CANSessionMux_SocketBufferFull;
return;
}
}
// Print is here, and we can better debug this in the future.
std::printf("Send Error %d %d %s\n", result, errno, std::strerror(errno));
std::fflush(stdout);
*status = HAL_ERR_CANSessionMux_InvalidBuffer;
return;
}
@@ -313,46 +385,91 @@ void HAL_CAN_ReceiveMessage(int32_t busId, uint32_t messageId,
std::scoped_lock lock{canState->readMutex[busId]};
auto& msg = canState->readFrames[busId][messageId];
if (msg.timestamp == 0) {
if (msg.message.timeStamp == 0) {
message->message.dataSize = 0;
message->timeStamp = 0;
*status = HAL_ERR_CANSessionMux_MessageNotFound;
return;
}
message->message.flags = HAL_CANFlags::HAL_CAN_NO_FLAGS;
message->message.flags |= (msg.frame.flags & CANFD_FDF)
? HAL_CANFlags::HAL_CAN_FD_DATALENGTH
: HAL_CANFlags::HAL_CAN_NO_FLAGS;
message->message.flags |= (msg.frame.flags & CANFD_BRS)
? HAL_CANFlags::HAL_CAN_FD_BITRATESWITCH
: HAL_CANFlags::HAL_CAN_NO_FLAGS;
*message = msg.message;
msg.message.timeStamp = 0;
message->message.dataSize = msg.frame.len;
if (msg.frame.len > 0) {
std::memcpy(message->message.data, msg.frame.data, msg.frame.len);
}
message->timeStamp = msg.timestamp;
*status = 0;
msg.timestamp = 0;
return;
}
HAL_CANStreamHandle HAL_CAN_OpenStreamSession(int32_t busId, uint32_t messageId,
uint32_t messageIDMask,
uint32_t messageIdMask,
uint32_t maxMessages,
int32_t* status) {
*status = HAL_HANDLE_ERROR;
return 0;
if (busId < 0 || busId >= hal::kNumCanBuses) {
*status = PARAMETER_OUT_OF_RANGE;
return HAL_kInvalidHandle;
}
auto can = std::make_shared<CANStreamStorage>(maxMessages, busId,
messageIdMask, messageId);
auto handle = canStreamHandles->Allocate(can);
if (handle == HAL_kInvalidHandle) {
*status = NO_AVAILABLE_RESOURCES;
return HAL_kInvalidHandle;
}
std::scoped_lock lock{canState->readMutex[can->canBusId]};
canState->canStreams[can->canBusId].push_back(can.get());
return handle;
}
void HAL_CAN_CloseStreamSession(HAL_CANStreamHandle sessionHandle) {}
void HAL_CAN_CloseStreamSession(HAL_CANStreamHandle sessionHandle) {
auto can = canStreamHandles->Free(sessionHandle);
if (can == nullptr) {
return;
}
std::scoped_lock lock{canState->readMutex[can->canBusId]};
std::erase(canState->canStreams[can->canBusId], can.get());
}
void HAL_CAN_ReadStreamSession(HAL_CANStreamHandle sessionHandle,
struct HAL_CANStreamMessage* messages,
uint32_t messagesToRead, uint32_t* messagesRead,
int32_t* status) {
*status = HAL_HANDLE_ERROR;
if (messages == nullptr || messagesRead == nullptr) {
*status = PARAMETER_OUT_OF_RANGE;
return;
}
auto can = canStreamHandles->Get(sessionHandle);
if (!can) {
*status = HAL_HANDLE_ERROR;
return;
}
std::scoped_lock lock{canState->readMutex[can->canBusId]};
size_t readCount = (std::min)(static_cast<size_t>(messagesToRead),
can->receivedMessages.size());
for (size_t i = 0; i < readCount; i++) {
messages[i] = can->receivedMessages.pop_front();
}
*messagesRead = readCount;
if (can->didOverflow) {
can->didOverflow = false;
*status = HAL_ERR_CANSessionMux_SessionOverrun;
}
return;
}
void HAL_CAN_GetCANStatus(int32_t busId, float* percentBusUtilization,
uint32_t* busOffCount, uint32_t* txFullCount,
uint32_t* receiveErrorCount,