// Copyright (c) FIRST and other WPILib contributors. // Open Source Software; you can modify and/or share it under the terms of // the WPILib BSD license file in the root directory of this project. #include "wpinet/uv/Stream.h" #include using namespace wpi; using namespace wpi::uv; namespace { class CallbackWriteReq : public WriteReq { public: CallbackWriteReq(std::span bufs, std::function, Error)> callback) : m_bufs{bufs.begin(), bufs.end()} { finish.connect( [this, f = std::move(callback)](Error err) { f(m_bufs, err); }); } private: SmallVector m_bufs; }; } // namespace namespace wpi::uv { ShutdownReq::ShutdownReq() { error = [this](Error err) { GetStream().error(err); }; } WriteReq::WriteReq() { error = [this](Error err) { GetStream().error(err); }; } void Stream::Shutdown(const std::shared_ptr& req) { if (IsLoopClosing()) { return; } if (Invoke(&uv_shutdown, req->GetRaw(), GetRawStream(), [](uv_shutdown_t* req, int status) { auto& h = *static_cast(req->data); if (status < 0) { h.ReportError(status); } else { h.complete(); } h.Release(); // this is always a one-shot })) { req->Keep(); } } void Stream::Shutdown(std::function callback) { if (IsLoopClosing()) { return; } auto req = std::make_shared(); if (callback) { req->complete.connect(std::move(callback)); } Shutdown(req); } void Stream::StartRead() { if (IsLoopClosing()) { return; } Invoke(&uv_read_start, GetRawStream(), &Handle::AllocBuf, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { auto& h = *static_cast(stream->data); Buffer data = *buf; // nread=0 is simply ignored if (nread == UV_EOF) { h.end(); } else if (nread > 0) { h.data(data, static_cast(nread)); } else if (nread < 0) { h.ReportError(nread); } // free the buffer h.FreeBuf(data); }); } void Stream::Write(std::span bufs, const std::shared_ptr& req) { if (IsLoopClosing()) { return; } if (Invoke(&uv_write, req->GetRaw(), GetRawStream(), bufs.data(), bufs.size(), [](uv_write_t* r, int status) { auto& h = *static_cast(r->data); if (status < 0) { h.ReportError(status); } h.finish(Error(status)); h.Release(); // this is always a one-shot })) { req->Keep(); } } void Stream::Write(std::span bufs, std::function, Error)> callback) { Write(bufs, std::make_shared(bufs, std::move(callback))); } int Stream::TryWrite(std::span bufs) { if (IsLoopClosing()) { return 0; } int val = uv_try_write(GetRawStream(), bufs.data(), bufs.size()); if (val < 0) { this->ReportError(val); return 0; } return val; } int Stream::TryWrite2(std::span bufs, Stream& send) { if (IsLoopClosing()) { return 0; } int val = uv_try_write2(GetRawStream(), bufs.data(), bufs.size(), send.GetRawStream()); if (val < 0) { this->ReportError(val); return 0; } return val; } } // namespace wpi::uv