[wpinet] WebSocket: Utilize uv::Handle logging

This commit is contained in:
Peter Johnson
2024-01-19 18:12:24 -08:00
parent 6fab87fa4c
commit f518e143d0
3 changed files with 40 additions and 36 deletions

View File

@@ -25,14 +25,21 @@
using namespace wpi;
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
static std::string DebugBinary(std::span<const uint8_t> val) {
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT
std::string str;
wpi::raw_string_ostream stros{str};
size_t limited = 0;
if (val.size() > 30) {
limited = val.size();
val = val.subspan(0, 30);
}
for (auto ch : val) {
stros << fmt::format("{:02x},", static_cast<unsigned int>(ch) & 0xff);
}
if (limited != 0) {
stros << fmt::format("... (total {})", limited);
}
return str;
#else
return "";
@@ -46,7 +53,6 @@ static inline std::string_view DebugText(std::string_view val) {
return "";
#endif
}
#endif // WPINET_WEBSOCKET_VERBOSE_DEBUG
class WebSocket::WriteReq : public uv::WriteReq,
public detail::WebSocketWriteReqBase {
@@ -61,7 +67,7 @@ class WebSocket::WriteReq : public uv::WriteReq,
void Send(uv::Error err) {
auto ws = m_ws.lock();
if (!ws || err) {
WS_DEBUG("no WS or error, calling callback\n");
// WS_DEBUG("no WS or error, calling callback\n");
m_frames.ReleaseBufs();
m_callback(m_userBufs, err);
return;
@@ -71,18 +77,18 @@ class WebSocket::WriteReq : public uv::WriteReq,
if (m_controlCont) {
// We have a control frame; switch to it. We will come back here via
// the control frame's m_cont when it's done.
WS_DEBUG("Continuing with a control write\n");
WS_DEBUG(ws->GetStream(), "Continuing with a control write");
auto controlCont = std::move(m_controlCont);
m_controlCont.reset();
return controlCont->Send({});
}
int result = Continue(ws->m_stream, shared_from_this());
WS_DEBUG("Continue() -> {}\n", result);
WS_DEBUG(ws->GetStream(), "Continue() -> {}", result);
if (result <= 0) {
m_frames.ReleaseBufs();
m_callback(m_userBufs, uv::Error{result});
if (result == 0 && m_cont) {
WS_DEBUG("Continuing with another write\n");
WS_DEBUG(ws->GetStream(), "Continuing with another write");
ws->m_curWriteReq = m_cont;
return m_cont->Send({});
} else {
@@ -557,7 +563,7 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
uint8_t opcode = m_header[0] & kOpMask;
switch (opcode) {
case kOpCont:
WS_DEBUG("WS Fragment {} [{}]\n", m_payload.size(),
WS_DEBUG(m_stream, "WS Fragment {} [{}]", m_payload.size(),
DebugBinary(m_payload));
switch (m_fragmentOpcode) {
case kOpText:
@@ -565,15 +571,15 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
std::string_view content{
reinterpret_cast<char*>(m_payload.data()),
m_payload.size()};
WS_DEBUG("WS RecvText(Defrag) {} ({})\n", m_payload.size(),
DebugText(content));
WS_DEBUG(m_stream, "WS RecvText(Defrag) {} ({})",
m_payload.size(), DebugText(content));
text(content, fin);
}
break;
case kOpBinary:
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvBinary(Defrag) {} ({})\n", m_payload.size(),
DebugBinary(m_payload));
WS_DEBUG(m_stream, "WS RecvBinary(Defrag) {} ({})",
m_payload.size(), DebugBinary(m_payload));
binary(m_payload, fin);
}
break;
@@ -589,34 +595,35 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
std::string_view content{reinterpret_cast<char*>(m_payload.data()),
m_payload.size()};
if (m_fragmentOpcode != 0) {
WS_DEBUG("WS RecvText {} ({}) -> INCOMPLETE FRAGMENT\n",
WS_DEBUG(m_stream, "WS RecvText {} ({}) -> INCOMPLETE FRAGMENT",
m_payload.size(), DebugText(content));
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvText {} ({})\n", m_payload.size(),
WS_DEBUG(m_stream, "WS RecvText {} ({})", m_payload.size(),
DebugText(content));
text(content, fin);
}
if (!fin) {
WS_DEBUG("WS RecvText {} StartFrag\n", m_payload.size());
WS_DEBUG(m_stream, "WS RecvText {} StartFrag", m_payload.size());
m_fragmentOpcode = opcode;
}
break;
}
case kOpBinary:
if (m_fragmentOpcode != 0) {
WS_DEBUG("WS RecvBinary {} ({}) -> INCOMPLETE FRAGMENT\n",
WS_DEBUG(m_stream, "WS RecvBinary {} ({}) -> INCOMPLETE FRAGMENT",
m_payload.size(), DebugBinary(m_payload));
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvBinary {} ({})\n", m_payload.size(),
WS_DEBUG(m_stream, "WS RecvBinary {} ({})", m_payload.size(),
DebugBinary(m_payload));
binary(m_payload, fin);
}
if (!fin) {
WS_DEBUG("WS RecvBinary {} StartFrag\n", m_payload.size());
WS_DEBUG(m_stream, "WS RecvBinary {} StartFrag",
m_payload.size());
m_fragmentOpcode = opcode;
}
break;
@@ -664,7 +671,7 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
}
});
}
WS_DEBUG("WS RecvPing() {} ({})\n", m_controlPayload.size(),
WS_DEBUG(m_stream, "WS RecvPing() {} ({})", m_controlPayload.size(),
DebugBinary(m_controlPayload));
ping(m_controlPayload);
break;
@@ -672,7 +679,7 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
if (!fin) {
return Fail(1002, "cannot fragment control frames");
}
WS_DEBUG("WS RecvPong() {} ({})\n", m_controlPayload.size(),
WS_DEBUG(m_stream, "WS RecvPong() {} ({})", m_controlPayload.size(),
DebugBinary(m_controlPayload));
pong(m_controlPayload);
break;
@@ -738,7 +745,7 @@ void WebSocket::SendFrames(
std::span<const Frame> frames,
std::function<void(std::span<uv::Buffer>, uv::Error)> callback) {
// If we're not open, emit an error and don't send the data
WS_DEBUG("SendFrames({})\n", frames.size());
WS_DEBUG(m_stream, "SendFrames({})", frames.size());
if (m_state != OPEN) {
SendError(frames, callback);
return;

View File

@@ -4,18 +4,15 @@
#pragma once
#include <fmt/format.h>
#include <wpi/Logger.h>
// #define WPINET_WEBSOCKET_VERBOSE_DEBUG
// #define WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT
#define WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT
#ifdef __clang__
#pragma clang diagnostic ignored "-Wgnu-zero-variadic-macro-arguments"
#endif
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
#define WS_DEBUG(format, ...) \
::fmt::print(FMT_STRING(format) __VA_OPT__(, ) __VA_ARGS__)
#else
#define WS_DEBUG(fmt, ...)
#endif
#define WS_DEBUG(stream, format, ...) \
if (auto logger_ = stream.GetLogger()) { \
WPI_DEBUG4(*logger_, "WS: " format __VA_OPT__(, ) __VA_ARGS__) \
}

View File

@@ -73,7 +73,7 @@ int WebSocketWriteReqBase::Continue(Stream& stream, std::shared_ptr<Req> req) {
}
int sentBytes = stream.TryWrite(bufs);
WS_DEBUG("TryWrite({}) -> {} (expected {})\n", bufs.size(), sentBytes,
WS_DEBUG(stream, "TryWrite({}) -> {} (expected {})", bufs.size(), sentBytes,
numBytes);
if (sentBytes < 0) {
return sentBytes; // error
@@ -133,10 +133,10 @@ int WebSocketWriteReqBase::Continue(Stream& stream, std::shared_ptr<Req> req) {
m_continueBufPos += bufIt - bufs.begin();
if (writeBufs.empty()) {
WS_DEBUG("Write Done\n");
WS_DEBUG(stream, "Write Done");
return 0;
}
WS_DEBUG("Write({})\n", writeBufs.size());
WS_DEBUG(stream, "Write({})", writeBufs.size());
stream.Write(writeBufs, req);
return 1;
}
@@ -146,7 +146,7 @@ std::span<const WebSocket::Frame> TrySendFrames(
bool server, Stream& stream, std::span<const WebSocket::Frame> frames,
MakeReq&& makeReq,
std::function<void(std::span<uv::Buffer>, uv::Error)> callback) {
WS_DEBUG("TrySendFrames({})\n", frames.size());
WS_DEBUG(stream, "TrySendFrames({})", frames.size());
auto frameIt = frames.begin();
auto frameEnd = frames.end();
while (frameIt != frameEnd) {
@@ -168,8 +168,8 @@ std::span<const WebSocket::Frame> TrySendFrames(
// try to send
int sentBytes = stream.TryWrite(sendFrames.m_bufs);
WS_DEBUG("TryWrite({}) -> {} (expected {})\n", sendFrames.m_bufs.size(),
sentBytes, numBytes);
WS_DEBUG(stream, "TryWrite({}) -> {} (expected {})",
sendFrames.m_bufs.size(), sentBytes, numBytes);
if (sentBytes == 0) {
// we haven't started a frame yet; clean up any bufs that have actually
@@ -281,7 +281,7 @@ std::span<const WebSocket::Frame> TrySendFrames(
req->m_userBufs.append(it->data.begin(), it->data.end());
}
WS_DEBUG("Write({})\n", writeBufs.size());
WS_DEBUG(stream, "Write({})", writeBufs.size());
stream.Write(writeBufs, req);
#ifdef __clang__
// work around clang bug