diff --git a/hal/src/main/java/edu/wpi/first/hal/can/CANStreamMessage.java b/hal/src/main/java/edu/wpi/first/hal/can/CANStreamMessage.java index 32a9c30cf8..31ff39de92 100644 --- a/hal/src/main/java/edu/wpi/first/hal/can/CANStreamMessage.java +++ b/hal/src/main/java/edu/wpi/first/hal/can/CANStreamMessage.java @@ -8,9 +8,9 @@ package edu.wpi.first.hal.can; public class CANStreamMessage { /** The message data. */ @SuppressWarnings("MemberName") - public final byte[] data = new byte[8]; + public final byte[] data = new byte[64]; - /** The length of the data received (0-8 bytes). */ + /** The length of the data received (0-64 bytes). */ @SuppressWarnings("MemberName") public int length; diff --git a/hal/src/main/native/cpp/jni/CANJNI.cpp b/hal/src/main/native/cpp/jni/CANJNI.cpp index f3d56770a6..f6bd5c3021 100644 --- a/hal/src/main/native/cpp/jni/CANJNI.cpp +++ b/hal/src/main/native/cpp/jni/CANJNI.cpp @@ -214,8 +214,9 @@ Java_edu_wpi_first_hal_can_CANJNI_readCANStreamSession } } JLocal toSetArray{ - env, SetCANStreamObject(env, elem, msg->message.message.dataSize, - msg->messageId, msg->message.timeStamp)}; + env, SetCANStreamObject(env, elem, msg->message.message.flags, + msg->message.message.dataSize, msg->messageId, + msg->message.timeStamp)}; auto javaLen = env->GetArrayLength(toSetArray); if (javaLen < msg->message.message.dataSize) { ThrowIllegalArgumentException( diff --git a/hal/src/main/native/cpp/jni/HALUtil.cpp b/hal/src/main/native/cpp/jni/HALUtil.cpp index a9f2bce392..cca5bf41cb 100644 --- a/hal/src/main/native/cpp/jni/HALUtil.cpp +++ b/hal/src/main/native/cpp/jni/HALUtil.cpp @@ -235,13 +235,13 @@ jbyteArray SetCANReceiveMessageObject(JNIEnv* env, jobject canData, } jbyteArray SetCANStreamObject(JNIEnv* env, jobject canStreamData, - int32_t length, uint32_t messageId, + int32_t length, int32_t flags, uint32_t messageId, uint64_t timestamp) { static jmethodID func = - env->GetMethodID(canStreamMessageCls, "setStreamData", "(IIJ)[B"); + env->GetMethodID(canStreamMessageCls, "setStreamData", "(IIIJ)[B"); jbyteArray retVal = static_cast(env->CallObjectMethod( - canStreamData, func, static_cast(length), + canStreamData, func, static_cast(length), static_cast(flags), static_cast(messageId), static_cast(timestamp))); return retVal; } diff --git a/hal/src/main/native/cpp/jni/HALUtil.h b/hal/src/main/native/cpp/jni/HALUtil.h index 6a440f5ea0..d8470a2683 100644 --- a/hal/src/main/native/cpp/jni/HALUtil.h +++ b/hal/src/main/native/cpp/jni/HALUtil.h @@ -67,7 +67,7 @@ jbyteArray SetCANReceiveMessageObject(JNIEnv* env, jobject canData, uint64_t timestamp); jbyteArray SetCANStreamObject(JNIEnv* env, jobject canStreamData, - int32_t length, uint32_t messageId, + int32_t length, int32_t flags, uint32_t messageId, uint64_t timestamp); jobject CreateHALValue(JNIEnv* env, const HAL_Value& value); diff --git a/hal/src/main/native/include/hal/CAN.h b/hal/src/main/native/include/hal/CAN.h index c610be8c41..627c05ad60 100644 --- a/hal/src/main/native/include/hal/CAN.h +++ b/hal/src/main/native/include/hal/CAN.h @@ -19,6 +19,8 @@ #define HAL_ERR_CANSessionMux_InvalidBuffer -44086 #define HAL_ERR_CANSessionMux_MessageNotFound -44087 #define HAL_WARN_CANSessionMux_NoToken 44087 +#define HAL_WARN_CANSessionMux_TxQueueFull 44086 +#define HAL_WARN_CANSessionMux_SocketBufferFull 44088 #define HAL_ERR_CANSessionMux_NotAllowed -44088 #define HAL_ERR_CANSessionMux_NotInitialized -44089 #define HAL_ERR_CANSessionMux_SessionOverrun 44050 diff --git a/hal/src/main/native/include/hal/Errors.h b/hal/src/main/native/include/hal/Errors.h index b24bfb7126..6dc461b02c 100644 --- a/hal/src/main/native/include/hal/Errors.h +++ b/hal/src/main/native/include/hal/Errors.h @@ -35,6 +35,10 @@ #define WARN_CANSessionMux_NoToken_MESSAGE "CAN: No token" #define ERR_CANSessionMux_NotAllowed_MESSAGE "CAN: Not allowed" #define ERR_CANSessionMux_NotInitialized_MESSAGE "CAN: Not initialized" +#define HAL_WARN_CANSessionMux_TxQueueFull_MESSAGE \ + "CAN: TX Queue full. Generally caused by a disconnected bus." +#define HAL_WARN_CANSessionMux_SocketBufferFull_MESSAGE \ + "CAN: Socket Buffer full. Generally caused by sending too many packets." #define ERR_FRCSystem_NetCommNotResponding_MESSAGE \ "FRCSystem: NetComm not responding" diff --git a/hal/src/main/native/include/hal/handles/HandlesInternal.h b/hal/src/main/native/include/hal/handles/HandlesInternal.h index 5782b771eb..b3b8875b04 100644 --- a/hal/src/main/native/include/hal/handles/HandlesInternal.h +++ b/hal/src/main/native/include/hal/handles/HandlesInternal.h @@ -72,6 +72,7 @@ enum class HAL_HandleEnum { CTREPDP = 25, REVPDH = 26, REVPH = 27, + CANStream = 28, }; /** diff --git a/hal/src/main/native/systemcore/CAN.cpp b/hal/src/main/native/systemcore/CAN.cpp index 2ce7a8f370..1b6bc4b290 100644 --- a/hal/src/main/native/systemcore/CAN.cpp +++ b/hal/src/main/native/systemcore/CAN.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -25,10 +26,13 @@ #include "PortsInternal.h" #include "hal/Errors.h" #include "hal/Threads.h" +#include "hal/handles/UnlimitedHandleResource.h" #include "wpinet/EventLoopRunner.h" #include "wpinet/uv/Poll.h" #include "wpinet/uv/Timer.h" +using namespace hal; + namespace { static constexpr uint32_t MatchingBitMask = CAN_EFF_MASK | CAN_RTR_FLAG; @@ -60,9 +64,23 @@ uint32_t MapSocketCanToMessageId(uint32_t id) { return toRet; } -struct FrameStore { - canfd_frame frame; - uint64_t timestamp{0}; +struct CANStreamStorage { + CANStreamStorage(uint32_t maxMessages, uint8_t busId, uint32_t mask, + uint32_t filter) + : receivedMessages{maxMessages}, + allowedMessages{maxMessages}, + canBusId{busId}, + canMask{mask}, + canFilter{filter} {} + + wpi::circular_buffer receivedMessages; + bool didOverflow{false}; + uint32_t allowedMessages; + uint8_t canBusId; + uint32_t canMask; + uint32_t canFilter; + + void CheckFrame(const HAL_CANStreamMessage& message); }; struct SocketCanState { @@ -82,7 +100,8 @@ struct SocketCanState { wpi::mutex readMutex[hal::kNumCanBuses]; // TODO(thadhouse) we need a MUCH better way of doing this masking - wpi::DenseMap readFrames[hal::kNumCanBuses]; + wpi::DenseMap readFrames[hal::kNumCanBuses]; + std::vector canStreams[hal::kNumCanBuses]; bool InitializeBuses(); @@ -95,14 +114,36 @@ struct SocketCanState { } // namespace +static UnlimitedHandleResource* canStreamHandles; + static SocketCanState* canState; namespace hal::init { void InitializeCAN() { canState = new SocketCanState{}; + static UnlimitedHandleResource + cSH; + canStreamHandles = &cSH; } } // namespace hal::init +void CANStreamStorage::CheckFrame(const HAL_CANStreamMessage& message) { + if ((message.messageId & canMask) != canFilter) { + return; + } + + // We already are holding the lock here. + + if (receivedMessages.size() == allowedMessages) { + didOverflow = true; + return; + } + + receivedMessages.push_back(message); +} + bool SocketCanState::InitializeBuses() { bool success = true; readLoopRunner.ExecSync([this, &success](wpi::uv::Loop& loop) { @@ -152,33 +193,51 @@ bool SocketCanState::InitializeBuses() { return; } - poll->pollEvent.connect( - [this, fd = socketHandle[i], canIndex = i](int mask) { - if (mask & UV_READABLE) { - canfd_frame frame; - int rVal = read(fd, &frame, sizeof(frame)); - if (rVal <= 0) { - // TODO(thadhouse) error handling - return; - } - if (frame.can_id & CAN_ERR_FLAG) { - // Do nothing if this is an error frame - return; - } + poll->pollEvent.connect([this, fd = socketHandle[i], + canIndex = i](int mask) { + if (mask & UV_READABLE) { + canfd_frame frame; + int rVal = read(fd, &frame, sizeof(frame)); + if (rVal <= 0) { + // TODO(thadhouse) error handling + return; + } + if (frame.can_id & CAN_ERR_FLAG) { + // Do nothing if this is an error frame + return; + } - uint32_t messageId = MapSocketCanToMessageId(frame.can_id); - uint64_t timestamp = wpi::Now(); - // Ensure FDF flag is set for the read later. - if (rVal == CANFD_MTU) { - frame.flags = CANFD_FDF; - } + uint32_t messageId = MapSocketCanToMessageId(frame.can_id); + uint64_t timestamp = wpi::Now(); + // Ensure FDF flag is set for the read later. + if (rVal == CANFD_MTU) { + frame.flags = CANFD_FDF; + } - std::scoped_lock lock{readMutex[canIndex]}; - auto& msg = readFrames[canIndex][messageId]; - msg.frame = frame; - msg.timestamp = timestamp; - } - }); + std::scoped_lock lock{readMutex[canIndex]}; + auto& msg = readFrames[canIndex][messageId]; + + msg.messageId = messageId; + msg.message.timeStamp = timestamp; + + msg.message.message.flags = HAL_CANFlags::HAL_CAN_NO_FLAGS; + msg.message.message.flags |= (frame.flags & CANFD_FDF) + ? HAL_CANFlags::HAL_CAN_FD_DATALENGTH + : HAL_CANFlags::HAL_CAN_NO_FLAGS; + msg.message.message.flags |= + (frame.flags & CANFD_BRS) ? HAL_CANFlags::HAL_CAN_FD_BITRATESWITCH + : HAL_CANFlags::HAL_CAN_NO_FLAGS; + + msg.message.message.dataSize = frame.len; + if (frame.len > 0) { + std::memcpy(msg.message.message.data, frame.data, frame.len); + } + + for (auto&& stream : canStreams[canIndex]) { + stream->CheckFrame(msg); + } + } + }); poll->Start(UV_READABLE); } @@ -287,7 +346,20 @@ void HAL_CAN_SendMessage(int32_t busId, uint32_t messageId, std::scoped_lock lock{canState->writeMutex[busId]}; int result = send(canState->socketHandle[busId], &frame, mtu, 0); if (result != mtu) { - // TODO(thadhouse) better error + if (result == -1) { + int err = errno; + if (err == ENOBUFS) { + *status = HAL_WARN_CANSessionMux_TxQueueFull; + return; + } else if (err == EAGAIN || err == EWOULDBLOCK) { + *status = HAL_WARN_CANSessionMux_SocketBufferFull; + return; + } + } + + // Print is here, and we can better debug this in the future. + std::printf("Send Error %d %d %s\n", result, errno, std::strerror(errno)); + std::fflush(stdout); *status = HAL_ERR_CANSessionMux_InvalidBuffer; return; } @@ -313,46 +385,91 @@ void HAL_CAN_ReceiveMessage(int32_t busId, uint32_t messageId, std::scoped_lock lock{canState->readMutex[busId]}; auto& msg = canState->readFrames[busId][messageId]; - if (msg.timestamp == 0) { + if (msg.message.timeStamp == 0) { message->message.dataSize = 0; message->timeStamp = 0; *status = HAL_ERR_CANSessionMux_MessageNotFound; return; } - message->message.flags = HAL_CANFlags::HAL_CAN_NO_FLAGS; - message->message.flags |= (msg.frame.flags & CANFD_FDF) - ? HAL_CANFlags::HAL_CAN_FD_DATALENGTH - : HAL_CANFlags::HAL_CAN_NO_FLAGS; - message->message.flags |= (msg.frame.flags & CANFD_BRS) - ? HAL_CANFlags::HAL_CAN_FD_BITRATESWITCH - : HAL_CANFlags::HAL_CAN_NO_FLAGS; + *message = msg.message; + msg.message.timeStamp = 0; - message->message.dataSize = msg.frame.len; - if (msg.frame.len > 0) { - std::memcpy(message->message.data, msg.frame.data, msg.frame.len); - } - - message->timeStamp = msg.timestamp; *status = 0; - msg.timestamp = 0; return; } + HAL_CANStreamHandle HAL_CAN_OpenStreamSession(int32_t busId, uint32_t messageId, - uint32_t messageIDMask, + uint32_t messageIdMask, uint32_t maxMessages, int32_t* status) { - *status = HAL_HANDLE_ERROR; - return 0; + if (busId < 0 || busId >= hal::kNumCanBuses) { + *status = PARAMETER_OUT_OF_RANGE; + return HAL_kInvalidHandle; + } + + auto can = std::make_shared(maxMessages, busId, + messageIdMask, messageId); + + auto handle = canStreamHandles->Allocate(can); + + if (handle == HAL_kInvalidHandle) { + *status = NO_AVAILABLE_RESOURCES; + return HAL_kInvalidHandle; + } + + std::scoped_lock lock{canState->readMutex[can->canBusId]}; + + canState->canStreams[can->canBusId].push_back(can.get()); + + return handle; } -void HAL_CAN_CloseStreamSession(HAL_CANStreamHandle sessionHandle) {} + +void HAL_CAN_CloseStreamSession(HAL_CANStreamHandle sessionHandle) { + auto can = canStreamHandles->Free(sessionHandle); + + if (can == nullptr) { + return; + } + + std::scoped_lock lock{canState->readMutex[can->canBusId]}; + std::erase(canState->canStreams[can->canBusId], can.get()); +} + void HAL_CAN_ReadStreamSession(HAL_CANStreamHandle sessionHandle, struct HAL_CANStreamMessage* messages, uint32_t messagesToRead, uint32_t* messagesRead, int32_t* status) { - *status = HAL_HANDLE_ERROR; + if (messages == nullptr || messagesRead == nullptr) { + *status = PARAMETER_OUT_OF_RANGE; + return; + } + + auto can = canStreamHandles->Get(sessionHandle); + if (!can) { + *status = HAL_HANDLE_ERROR; + return; + } + + std::scoped_lock lock{canState->readMutex[can->canBusId]}; + + size_t readCount = (std::min)(static_cast(messagesToRead), + can->receivedMessages.size()); + + for (size_t i = 0; i < readCount; i++) { + messages[i] = can->receivedMessages.pop_front(); + } + + *messagesRead = readCount; + + if (can->didOverflow) { + can->didOverflow = false; + *status = HAL_ERR_CANSessionMux_SessionOverrun; + } + return; } + void HAL_CAN_GetCANStatus(int32_t busId, float* percentBusUtilization, uint32_t* busOffCount, uint32_t* txFullCount, uint32_t* receiveErrorCount, diff --git a/hal/src/main/native/systemcore/HAL.cpp b/hal/src/main/native/systemcore/HAL.cpp index e0db58155c..9e8de2afd1 100644 --- a/hal/src/main/native/systemcore/HAL.cpp +++ b/hal/src/main/native/systemcore/HAL.cpp @@ -130,6 +130,10 @@ const char* HAL_GetErrorMessage(int32_t code) { return ERR_CANSessionMux_NotAllowed_MESSAGE; case HAL_ERR_CANSessionMux_NotInitialized: return ERR_CANSessionMux_NotInitialized_MESSAGE; + case HAL_WARN_CANSessionMux_TxQueueFull: + return HAL_WARN_CANSessionMux_TxQueueFull_MESSAGE; + case HAL_WARN_CANSessionMux_SocketBufferFull: + return HAL_WARN_CANSessionMux_SocketBufferFull_MESSAGE; case HAL_PWM_SCALE_ERROR: return HAL_PWM_SCALE_ERROR_MESSAGE; case HAL_SERIAL_PORT_NOT_FOUND: