[wpinet] WebSocket: Add SendFrames() (#4445)

This commit is contained in:
Peter Johnson
2022-10-03 06:04:08 -07:00
committed by GitHub
parent ceaf493811
commit 5c067d30a0
2 changed files with 114 additions and 35 deletions

View File

@@ -11,6 +11,7 @@
#include <wpi/SmallString.h>
#include <wpi/SmallVector.h>
#include <wpi/StringExtras.h>
#include <wpi/raw_ostream.h>
#include <wpi/sha1.h>
#include "wpinet/HttpParser.h"
@@ -26,20 +27,22 @@ class WebSocketWriteReq : public uv::WriteReq {
std::function<void(span<uv::Buffer>, uv::Error)> callback)
: m_callback{std::move(callback)} {
finish.connect([this](uv::Error err) {
span<uv::Buffer> bufs{m_bufs};
for (auto&& buf : bufs.subspan(0, m_startUser)) {
for (auto&& buf : m_internalBufs) {
buf.Deallocate();
}
m_callback(bufs.subspan(m_startUser), err);
m_callback(m_userBufs, err);
});
}
std::function<void(span<uv::Buffer>, uv::Error)> m_callback;
SmallVector<uv::Buffer, 4> m_bufs;
size_t m_startUser;
SmallVector<uv::Buffer, 4> m_internalBufs;
SmallVector<uv::Buffer, 4> m_userBufs;
};
} // namespace
static constexpr uint8_t kFlagMasking = 0x80;
static constexpr uint8_t kLenMask = 0x7f;
class WebSocket::ClientHandshakeData {
public:
ClientHandshakeData() {
@@ -502,6 +505,12 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
fmt::print(
"WS RecvText({})\n",
std::string_view{reinterpret_cast<char*>(m_payload.data()),
m_payload.size()});
#endif
text(std::string_view{reinterpret_cast<char*>(m_payload.data()),
m_payload.size()},
fin);
@@ -515,6 +524,15 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
SmallString<128> str;
raw_svector_ostream stros{str};
for (auto ch : m_payload) {
stros << fmt::format("{:02x},",
static_cast<unsigned int>(ch) & 0xff);
}
fmt::print("WS RecvBinary({})\n", str.str());
#endif
binary(m_payload, fin);
}
if (!fin) {
@@ -576,24 +594,30 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
}
}
void WebSocket::Send(
uint8_t opcode, span<const uv::Buffer> data,
std::function<void(span<uv::Buffer>, uv::Error)> callback) {
// If we're not open, emit an error and don't send the data
if (m_state != OPEN) {
int err;
if (m_state == CONNECTING) {
err = UV_EAGAIN;
} else {
err = UV_ESHUTDOWN;
}
SmallVector<uv::Buffer, 4> bufs{data.begin(), data.end()};
callback(bufs, uv::Error{err});
return;
}
static void WriteFrame(WebSocketWriteReq& req,
SmallVectorImpl<uv::Buffer>& bufs, bool server,
uint8_t opcode, span<const uv::Buffer> data) {
SmallVector<uv::Buffer, 4> internalBufs;
raw_uv_ostream os{internalBufs, 4096};
auto req = std::make_shared<WebSocketWriteReq>(std::move(callback));
raw_uv_ostream os{req->m_bufs, 4096};
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
if ((opcode & 0x7f) == 0x01) {
SmallString<128> str;
for (auto&& d : data) {
str.append(std::string_view(d.base, d.len));
}
fmt::print("WS SendText({})\n", str.str());
} else if ((opcode & 0x7f) == 0x02) {
SmallString<128> str;
raw_svector_ostream stros{str};
for (auto&& d : data) {
for (auto ch : d.data()) {
stros << fmt::format("{:02x},", static_cast<unsigned int>(ch) & 0xff);
}
}
fmt::print("WS SendBinary({})\n", str.str());
}
#endif
// opcode (includes FIN bit)
os << static_cast<unsigned char>(opcode);
@@ -604,14 +628,14 @@ void WebSocket::Send(
size += buf.len;
}
if (size < 126) {
os << static_cast<unsigned char>((m_server ? 0x00 : kFlagMasking) | size);
os << static_cast<unsigned char>((server ? 0x00 : kFlagMasking) | size);
} else if (size <= 0xffff) {
os << static_cast<unsigned char>((m_server ? 0x00 : kFlagMasking) | 126);
os << static_cast<unsigned char>((server ? 0x00 : kFlagMasking) | 126);
const uint8_t sizeMsb[] = {static_cast<uint8_t>((size >> 8) & 0xff),
static_cast<uint8_t>(size & 0xff)};
os << span{sizeMsb};
} else {
os << static_cast<unsigned char>((m_server ? 0x00 : kFlagMasking) | 127);
os << static_cast<unsigned char>((server ? 0x00 : kFlagMasking) | 127);
const uint8_t sizeMsb[] = {static_cast<uint8_t>((size >> 56) & 0xff),
static_cast<uint8_t>((size >> 48) & 0xff),
static_cast<uint8_t>((size >> 40) & 0xff),
@@ -624,7 +648,7 @@ void WebSocket::Send(
}
// clients need to mask the input data
if (!m_server) {
if (!server) {
// generate masking key
static std::random_device rd;
static std::default_random_engine gen{rd()};
@@ -644,14 +668,40 @@ void WebSocket::Send(
}
}
}
req->m_startUser = req->m_bufs.size();
req->m_bufs.append(data.begin(), data.end());
bufs.append(internalBufs.begin(), internalBufs.end());
// don't send the user bufs as we copied their data
m_stream.Write(span{req->m_bufs}.subspan(0, req->m_startUser), req);
} else {
bufs.append(internalBufs.begin(), internalBufs.end());
// servers can just send the buffers directly without masking
req->m_startUser = req->m_bufs.size();
req->m_bufs.append(data.begin(), data.end());
m_stream.Write(req->m_bufs, req);
bufs.append(data.begin(), data.end());
}
req.m_internalBufs.append(internalBufs.begin(), internalBufs.end());
req.m_userBufs.append(data.begin(), data.end());
}
void WebSocket::SendFrames(
span<const Frame> frames,
std::function<void(span<uv::Buffer>, uv::Error)> callback) {
// If we're not open, emit an error and don't send the data
if (m_state != OPEN) {
int err;
if (m_state == CONNECTING) {
err = UV_EAGAIN;
} else {
err = UV_ESHUTDOWN;
}
SmallVector<uv::Buffer, 4> bufs;
for (auto&& frame : frames) {
bufs.append(frame.data.begin(), frame.data.end());
}
callback(bufs, uv::Error{err});
return;
}
auto req = std::make_shared<WebSocketWriteReq>(std::move(callback));
SmallVector<uv::Buffer, 4> bufs;
for (auto&& frame : frames) {
WriteFrame(*req, bufs, m_server, frame.opcode, frame.data);
}
m_stream.Write(bufs, req);
}