diff --git a/wpinet/src/main/native/cpp/WebSocket.cpp b/wpinet/src/main/native/cpp/WebSocket.cpp index bbb98979e6..369722d963 100644 --- a/wpinet/src/main/native/cpp/WebSocket.cpp +++ b/wpinet/src/main/native/cpp/WebSocket.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "wpinet/HttpParser.h" @@ -26,20 +27,22 @@ class WebSocketWriteReq : public uv::WriteReq { std::function, uv::Error)> callback) : m_callback{std::move(callback)} { finish.connect([this](uv::Error err) { - span bufs{m_bufs}; - for (auto&& buf : bufs.subspan(0, m_startUser)) { + for (auto&& buf : m_internalBufs) { buf.Deallocate(); } - m_callback(bufs.subspan(m_startUser), err); + m_callback(m_userBufs, err); }); } std::function, uv::Error)> m_callback; - SmallVector m_bufs; - size_t m_startUser; + SmallVector m_internalBufs; + SmallVector m_userBufs; }; } // namespace +static constexpr uint8_t kFlagMasking = 0x80; +static constexpr uint8_t kLenMask = 0x7f; + class WebSocket::ClientHandshakeData { public: ClientHandshakeData() { @@ -502,6 +505,12 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) { return Fail(1002, "incomplete fragment"); } if (!m_combineFragments || fin) { +#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG + fmt::print( + "WS RecvText({})\n", + std::string_view{reinterpret_cast(m_payload.data()), + m_payload.size()}); +#endif text(std::string_view{reinterpret_cast(m_payload.data()), m_payload.size()}, fin); @@ -515,6 +524,15 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) { return Fail(1002, "incomplete fragment"); } if (!m_combineFragments || fin) { +#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG + SmallString<128> str; + raw_svector_ostream stros{str}; + for (auto ch : m_payload) { + stros << fmt::format("{:02x},", + static_cast(ch) & 0xff); + } + fmt::print("WS RecvBinary({})\n", str.str()); +#endif binary(m_payload, fin); } if (!fin) { @@ -576,24 +594,30 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) { } } -void WebSocket::Send( - uint8_t opcode, span data, - std::function, uv::Error)> callback) { - // If we're not open, emit an error and don't send the data - if (m_state != OPEN) { - int err; - if (m_state == CONNECTING) { - err = UV_EAGAIN; - } else { - err = UV_ESHUTDOWN; - } - SmallVector bufs{data.begin(), data.end()}; - callback(bufs, uv::Error{err}); - return; - } +static void WriteFrame(WebSocketWriteReq& req, + SmallVectorImpl& bufs, bool server, + uint8_t opcode, span data) { + SmallVector internalBufs; + raw_uv_ostream os{internalBufs, 4096}; - auto req = std::make_shared(std::move(callback)); - raw_uv_ostream os{req->m_bufs, 4096}; +#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG + if ((opcode & 0x7f) == 0x01) { + SmallString<128> str; + for (auto&& d : data) { + str.append(std::string_view(d.base, d.len)); + } + fmt::print("WS SendText({})\n", str.str()); + } else if ((opcode & 0x7f) == 0x02) { + SmallString<128> str; + raw_svector_ostream stros{str}; + for (auto&& d : data) { + for (auto ch : d.data()) { + stros << fmt::format("{:02x},", static_cast(ch) & 0xff); + } + } + fmt::print("WS SendBinary({})\n", str.str()); + } +#endif // opcode (includes FIN bit) os << static_cast(opcode); @@ -604,14 +628,14 @@ void WebSocket::Send( size += buf.len; } if (size < 126) { - os << static_cast((m_server ? 0x00 : kFlagMasking) | size); + os << static_cast((server ? 0x00 : kFlagMasking) | size); } else if (size <= 0xffff) { - os << static_cast((m_server ? 0x00 : kFlagMasking) | 126); + os << static_cast((server ? 0x00 : kFlagMasking) | 126); const uint8_t sizeMsb[] = {static_cast((size >> 8) & 0xff), static_cast(size & 0xff)}; os << span{sizeMsb}; } else { - os << static_cast((m_server ? 0x00 : kFlagMasking) | 127); + os << static_cast((server ? 0x00 : kFlagMasking) | 127); const uint8_t sizeMsb[] = {static_cast((size >> 56) & 0xff), static_cast((size >> 48) & 0xff), static_cast((size >> 40) & 0xff), @@ -624,7 +648,7 @@ void WebSocket::Send( } // clients need to mask the input data - if (!m_server) { + if (!server) { // generate masking key static std::random_device rd; static std::default_random_engine gen{rd()}; @@ -644,14 +668,40 @@ void WebSocket::Send( } } } - req->m_startUser = req->m_bufs.size(); - req->m_bufs.append(data.begin(), data.end()); + bufs.append(internalBufs.begin(), internalBufs.end()); // don't send the user bufs as we copied their data - m_stream.Write(span{req->m_bufs}.subspan(0, req->m_startUser), req); } else { + bufs.append(internalBufs.begin(), internalBufs.end()); // servers can just send the buffers directly without masking - req->m_startUser = req->m_bufs.size(); - req->m_bufs.append(data.begin(), data.end()); - m_stream.Write(req->m_bufs, req); + bufs.append(data.begin(), data.end()); } + req.m_internalBufs.append(internalBufs.begin(), internalBufs.end()); + req.m_userBufs.append(data.begin(), data.end()); +} + +void WebSocket::SendFrames( + span frames, + std::function, uv::Error)> callback) { + // If we're not open, emit an error and don't send the data + if (m_state != OPEN) { + int err; + if (m_state == CONNECTING) { + err = UV_EAGAIN; + } else { + err = UV_ESHUTDOWN; + } + SmallVector bufs; + for (auto&& frame : frames) { + bufs.append(frame.data.begin(), frame.data.end()); + } + callback(bufs, uv::Error{err}); + return; + } + + auto req = std::make_shared(std::move(callback)); + SmallVector bufs; + for (auto&& frame : frames) { + WriteFrame(*req, bufs, m_server, frame.opcode, frame.data); + } + m_stream.Write(bufs, req); } diff --git a/wpinet/src/main/native/include/wpinet/WebSocket.h b/wpinet/src/main/native/include/wpinet/WebSocket.h index 622d001328..931910668c 100644 --- a/wpinet/src/main/native/include/wpinet/WebSocket.h +++ b/wpinet/src/main/native/include/wpinet/WebSocket.h @@ -42,8 +42,6 @@ class WebSocket : public std::enable_shared_from_this { static constexpr uint8_t kOpPong = 0x0A; static constexpr uint8_t kOpMask = 0x0F; static constexpr uint8_t kFlagFin = 0x80; - static constexpr uint8_t kFlagMasking = 0x80; - static constexpr uint8_t kLenMask = 0x7f; public: WebSocket(uv::Stream& stream, bool server, const private_init&); @@ -82,6 +80,26 @@ class WebSocket : public std::enable_shared_from_this { span> extraHeaders; }; + /** + * Frame. Used by SendFrames(). + */ + struct Frame { + static constexpr uint8_t kText = kFlagFin | kOpText; + static constexpr uint8_t kBinary = kFlagFin | kOpBinary; + static constexpr uint8_t kTextFragment = kOpText; + static constexpr uint8_t kBinaryFragment = kOpBinary; + static constexpr uint8_t kFragment = kOpCont; + static constexpr uint8_t kFinalFragment = kFlagFin | kOpCont; + static constexpr uint8_t kPing = kFlagFin | kOpPing; + static constexpr uint8_t kPong = kFlagFin | kOpPong; + + Frame(uint8_t opcode, span data) + : opcode{opcode}, data{data} {} + + uint8_t opcode; + span data; + }; + /** * Starts a client connection by performing the initial client handshake. * An open event is emitted when the handshake completes. @@ -363,6 +381,15 @@ class WebSocket : public std::enable_shared_from_this { SendPong({data.begin(), data.end()}, std::move(callback)); } + /** + * Send multiple frames. + * + * @param frames Frame type/data pairs + * @param callback Callback which is invoked when the write completes. + */ + void SendFrames(span frames, + std::function, uv::Error)> callback); + /** * Fail the connection. */ @@ -470,7 +497,9 @@ class WebSocket : public std::enable_shared_from_this { void SetClosed(uint16_t code, std::string_view reason, bool failed = false); void HandleIncoming(uv::Buffer& buf, size_t size); void Send(uint8_t opcode, span data, - std::function, uv::Error)> callback); + std::function, uv::Error)> callback) { + SendFrames({{Frame{opcode, data}}}, std::move(callback)); + } }; } // namespace wpi