diff --git a/wpiutil/examples/parallelconnect/parallelconnect.cpp b/wpiutil/examples/parallelconnect/parallelconnect.cpp new file mode 100644 index 0000000000..929c84716e --- /dev/null +++ b/wpiutil/examples/parallelconnect/parallelconnect.cpp @@ -0,0 +1,50 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include + +#include "wpi/EventLoopRunner.h" +#include "wpi/Logger.h" +#include "wpi/ParallelTcpConnector.h" +#include "wpi/uv/Error.h" +#include "wpi/uv/Tcp.h" + +namespace uv = wpi::uv; + +static void logfunc(unsigned int level, const char* file, unsigned int line, + const char* msg) { + std::fprintf(stderr, "(%d) %s:%d: %s\n", level, file, line, msg); +} + +int main() { + wpi::Logger logger{logfunc, 0}; + + // Kick off the event loop on a separate thread + wpi::EventLoopRunner loop; + std::shared_ptr connect; + loop.ExecAsync([&](uv::Loop& loop) { + connect = wpi::ParallelTcpConnector::Create( + loop, uv::Timer::Time{2000}, logger, [&](uv::Tcp& tcp) { + std::fputs("Got connection, accepting!\n", stdout); + tcp.StartRead(); + connect->Succeeded(tcp); + tcp.end.connect([&] { + std::fputs("TCP connection ended, disconnecting!\n", stdout); + tcp.Close(); + connect->Disconnected(); + }); + tcp.error.connect([&](uv::Error) { + std::fputs("TCP error, disconnecting!\n", stdout); + connect->Disconnected(); + }); + }); + connect->SetServers({{{"roborio-294-frc.local", 8080}, + {"roborio-294-frc.frc-field.local", 8080}, + {"10.2.94.2", 8080}, + {"127.0.0.1", 8080}}}); + }); + + // wait for a keypress to terminate + std::getchar(); +} diff --git a/wpiutil/src/main/native/cpp/ParallelTcpConnector.cpp b/wpiutil/src/main/native/cpp/ParallelTcpConnector.cpp new file mode 100644 index 0000000000..5a8394ae27 --- /dev/null +++ b/wpiutil/src/main/native/cpp/ParallelTcpConnector.cpp @@ -0,0 +1,177 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "wpi/ParallelTcpConnector.h" + +#include + +#include "wpi/Logger.h" +#include "wpi/uv/GetAddrInfo.h" +#include "wpi/uv/Loop.h" +#include "wpi/uv/Tcp.h" +#include "wpi/uv/Timer.h" +#include "wpi/uv/util.h" + +using namespace wpi; + +ParallelTcpConnector::ParallelTcpConnector( + wpi::uv::Loop& loop, wpi::uv::Timer::Time reconnectRate, + wpi::Logger& logger, std::function connected, + const private_init&) + : m_loop{loop}, + m_logger{logger}, + m_reconnectRate{reconnectRate}, + m_connected{std::move(connected)}, + m_reconnectTimer{uv::Timer::Create(loop)} { + m_reconnectTimer->timeout.connect([this] { + if (!IsConnected()) { + WPI_DEBUG1(m_logger, "{}", "timed out, reconnecting"); + Connect(); + } + }); +} + +ParallelTcpConnector::~ParallelTcpConnector() = default; + +void ParallelTcpConnector::Close() { + CancelAll(); + m_reconnectTimer->Close(); +} + +void ParallelTcpConnector::SetServers( + wpi::span> servers) { + m_servers.assign(servers.begin(), servers.end()); + if (!IsConnected()) { + Connect(); + } +} + +void ParallelTcpConnector::Disconnected() { + if (m_isConnected) { + m_isConnected = false; + Connect(); + } +} + +void ParallelTcpConnector::Succeeded(uv::Tcp& tcp) { + if (!m_isConnected) { + m_isConnected = true; + m_reconnectTimer->Stop(); + CancelAll(&tcp); + } +} + +void ParallelTcpConnector::Connect() { + if (IsConnected()) { + return; + } + + CancelAll(); + m_reconnectTimer->Start(m_reconnectRate); + + WPI_DEBUG3(m_logger, "{}", "starting new connection attempts"); + + // kick off parallel lookups + for (auto&& server : m_servers) { + auto req = std::make_shared(); + m_resolvers.emplace_back(req); + + req->resolved.connect( + [this, req = req.get()](const addrinfo& addrinfo) { + if (IsConnected()) { + return; + } + + // kick off parallel connection attempts + for (auto ai = &addrinfo; ai; ai = ai->ai_next) { + auto tcp = uv::Tcp::Create(m_loop); + m_attempts.emplace_back(tcp); + + auto connreq = std::make_shared(); + connreq->connected.connect( + [this, tcp = tcp.get()] { + if (m_logger.min_level() <= wpi::WPI_LOG_DEBUG4) { + std::string ip; + unsigned int port = 0; + uv::AddrToName(tcp->GetPeer(), &ip, &port); + WPI_DEBUG4(m_logger, + "successful connection ({}) to {} port {}", + static_cast(tcp), ip, port); + } + if (IsConnected()) { + tcp->Shutdown([tcp] { tcp->Close(); }); + return; + } + if (m_connected) { + m_connected(*tcp); + } + }, + shared_from_this()); + + connreq->error = [selfWeak = weak_from_this(), + tcp = tcp.get()](uv::Error err) { + if (auto self = selfWeak.lock()) { + WPI_DEBUG1(self->m_logger, "connect failure ({}): {}", + static_cast(tcp), err.str()); + } + }; + + if (m_logger.min_level() <= wpi::WPI_LOG_DEBUG4) { + std::string ip; + unsigned int port = 0; + uv::AddrToName(*reinterpret_cast(ai->ai_addr), + &ip, &port); + WPI_DEBUG4( + m_logger, + "Info({}) starting connection attempt ({}) to {} port {}", + static_cast(req), static_cast(tcp.get()), ip, + port); + } + tcp->Connect(*ai->ai_addr, connreq); + } + }, + shared_from_this()); + + req->error = [req = req.get(), selfWeak = weak_from_this()](uv::Error err) { + if (auto self = selfWeak.lock()) { + WPI_DEBUG1(self->m_logger, "GetAddrInfo({}) failure: {}", + static_cast(req), err.str()); + } + }; + + WPI_DEBUG4(m_logger, "starting GetAddrInfo({}) for {} port {}", + static_cast(req.get()), server.first, server.second); + addrinfo hints; + std::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; + uv::GetAddrInfo(m_loop, req, server.first, fmt::format("{}", server.second), + &hints); + } +} + +void ParallelTcpConnector::CancelAll(wpi::uv::Tcp* except) { + WPI_DEBUG4(m_logger, "{}", "canceling previous attempts"); + for (auto&& resolverWeak : m_resolvers) { + if (auto resolver = resolverWeak.lock()) { + WPI_DEBUG4(m_logger, "canceling GetAddrInfo({})", + static_cast(resolver.get())); + resolver->Cancel(); + } + } + m_resolvers.clear(); + + for (auto&& tcpWeak : m_attempts) { + if (auto tcp = tcpWeak.lock()) { + if (tcp.get() != except) { + WPI_DEBUG4(m_logger, "canceling connection attempt ({})", + static_cast(tcp.get())); + tcp->Close(); + } + } + } + m_attempts.clear(); +} diff --git a/wpiutil/src/main/native/include/wpi/ParallelTcpConnector.h b/wpiutil/src/main/native/include/wpi/ParallelTcpConnector.h new file mode 100644 index 0000000000..47c29d0220 --- /dev/null +++ b/wpiutil/src/main/native/include/wpi/ParallelTcpConnector.h @@ -0,0 +1,119 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "wpi/span.h" +#include "wpi/uv/Timer.h" + +namespace wpi { + +class Logger; + +namespace uv { +class GetAddrInfoReq; +class Loop; +class Tcp; +class Timer; +} // namespace uv + +/** + * Parallel TCP connector. Attempts parallel resolution and connection to + * multiple servers with automatic retry if none connect. + * + * Each successful TCP connection results in a call to the connected callback. + * For correct operation, the consuming code (either the connected callback or + * e.g. task it starts) must call Succeeded() to indicate if the connection has + * succeeded prior to the reconnect rate timeout. A successful connection + * results in the connector terminating all other connection attempts. + * + * After the reconnect rate times out, all remaining active connection attempts + * are canceled and new ones started. + */ +class ParallelTcpConnector + : public std::enable_shared_from_this { + struct private_init {}; + + public: + /** + * Create. + * + * @param loop loop + * @param reconnectRate how long to wait after starting connection attempts + * to cancel and attempt connecting again + * @param logger logger + * @param connected callback function when a connection succeeds; may be + * called multiple times if it does not call Succeeded() + * before returning + * @return Parallel connector + */ + static std::shared_ptr Create( + wpi::uv::Loop& loop, wpi::uv::Timer::Time reconnectRate, + wpi::Logger& logger, std::function connected) { + return std::make_shared( + loop, reconnectRate, logger, std::move(connected), private_init{}); + } + + ParallelTcpConnector(wpi::uv::Loop& loop, wpi::uv::Timer::Time reconnectRate, + wpi::Logger& logger, + std::function connected, + const private_init&); + ~ParallelTcpConnector(); + + ParallelTcpConnector(const ParallelTcpConnector&) = delete; + ParallelTcpConnector& operator=(const ParallelTcpConnector&) = delete; + + /** + * Closes resources, canceling all pending action attempts. + */ + void Close(); + + /** + * Changes the servers/ports to connect to. Starts connection attempts if not + * already connected. + * + * @param servers array of server/port pairs + */ + void SetServers( + wpi::span> servers); + + /** + * Tells the parallel connector that the current connection has terminated and + * it is necessary to start reconnection attempts. + */ + void Disconnected(); + + /** + * Tells the parallel connector that a particular connection has succeeded and + * it should stop trying to connect. + * + * @param tcp connection passed to connected callback + */ + void Succeeded(wpi::uv::Tcp& tcp); + + private: + bool IsConnected() const { return m_isConnected || m_servers.empty(); } + void Connect(); + void CancelAll(wpi::uv::Tcp* except = nullptr); + + wpi::uv::Loop& m_loop; + wpi::Logger& m_logger; + wpi::uv::Timer::Time m_reconnectRate; + std::function m_connected; + std::shared_ptr m_reconnectTimer; + std::vector> m_servers; + std::vector> m_resolvers; + std::vector> m_attempts; + bool m_isConnected{false}; +}; + +} // namespace wpi