[wpiutil] Separate third party libraries (#4190)

This commit is contained in:
PJ Reiniger
2022-06-18 11:08:31 -04:00
committed by GitHub
parent 6671f8d099
commit 787fe6e7a5
102 changed files with 165 additions and 124 deletions

View File

@@ -1,211 +0,0 @@
/*
TCPAcceptor.cpp
TCPAcceptor class definition. TCPAcceptor provides methods to passively
establish TCP/IP connections with clients.
------------------------------------------
Copyright (c) 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "wpinet/TCPAcceptor.h"
#include <cstdio>
#include <cstring>
#ifdef _WIN32
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WinSock2.h>
#include <Ws2tcpip.h>
#pragma comment(lib, "Ws2_32.lib")
#else
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <unistd.h>
#endif
#include <wpi/Logger.h>
#include <wpi/SmallString.h>
#include "wpinet/SocketError.h"
using namespace wpi;
TCPAcceptor::TCPAcceptor(int port, std::string_view address, Logger& logger)
: m_lsd(0),
m_port(port),
m_address(address),
m_listening(false),
m_logger(logger) {
m_shutdown = false;
#ifdef _WIN32
WSAData wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
(void)WSAStartup(wVersionRequested, &wsaData);
#endif
}
TCPAcceptor::~TCPAcceptor() {
if (m_lsd > 0) {
shutdown();
#ifdef _WIN32
closesocket(m_lsd);
#else
close(m_lsd);
#endif
}
#ifdef _WIN32
WSACleanup();
#endif
}
int TCPAcceptor::start() {
if (m_listening) {
return 0;
}
m_lsd = socket(PF_INET, SOCK_STREAM, 0);
if (m_lsd < 0) {
WPI_ERROR(m_logger, "{}", "could not create socket");
return -1;
}
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = PF_INET;
if (m_address.size() > 0) {
#ifdef _WIN32
SmallString<128> addr_copy(m_address);
addr_copy.push_back('\0');
int res = InetPton(PF_INET, addr_copy.data(), &(address.sin_addr));
#else
int res = inet_pton(PF_INET, m_address.c_str(), &(address.sin_addr));
#endif
if (res != 1) {
WPI_ERROR(m_logger, "could not resolve {} address", m_address);
return -1;
}
} else {
address.sin_addr.s_addr = INADDR_ANY;
}
address.sin_port = htons(m_port);
#ifdef _WIN32
int optval = 1;
setsockopt(m_lsd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
reinterpret_cast<char*>(&optval), sizeof optval);
#else
int optval = 1;
setsockopt(m_lsd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&optval),
sizeof optval);
#endif
int result = bind(m_lsd, reinterpret_cast<struct sockaddr*>(&address),
sizeof(address));
if (result != 0) {
WPI_ERROR(m_logger, "bind() to port {} failed: {}", m_port,
SocketStrerror());
return result;
}
result = listen(m_lsd, 5);
if (result != 0) {
WPI_ERROR(m_logger, "listen() on port {} failed: {}", m_port,
SocketStrerror());
return result;
}
m_listening = true;
return result;
}
void TCPAcceptor::shutdown() {
m_shutdown = true;
#ifdef _WIN32
::shutdown(m_lsd, SD_BOTH);
// this is ugly, but the easiest way to do this
// force wakeup of accept() with a non-blocking connect to ourselves
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = PF_INET;
SmallString<128> addr_copy;
if (m_address.size() > 0)
addr_copy = m_address;
else
addr_copy = "127.0.0.1";
addr_copy.push_back('\0');
int size = sizeof(address);
if (WSAStringToAddress(addr_copy.data(), PF_INET, nullptr,
(struct sockaddr*)&address, &size) != 0)
return;
address.sin_port = htons(m_port);
int result = -1, sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0)
return;
// Set socket to non-blocking
u_long mode = 1;
ioctlsocket(sd, FIONBIO, &mode);
// Try to connect
::connect(sd, (struct sockaddr*)&address, sizeof(address));
// Close
::closesocket(sd);
#else
::shutdown(m_lsd, SHUT_RDWR);
int nullfd = ::open("/dev/null", O_RDONLY);
if (nullfd >= 0) {
::dup2(nullfd, m_lsd);
::close(nullfd);
}
#endif
}
std::unique_ptr<NetworkStream> TCPAcceptor::accept() {
if (!m_listening || m_shutdown) {
return nullptr;
}
struct sockaddr_in address;
#ifdef _WIN32
int len = sizeof(address);
#else
socklen_t len = sizeof(address);
#endif
std::memset(&address, 0, sizeof(address));
int sd = ::accept(m_lsd, reinterpret_cast<struct sockaddr*>(&address), &len);
if (sd < 0) {
if (!m_shutdown) {
WPI_ERROR(m_logger, "accept() on port {} failed: {}", m_port,
SocketStrerror());
}
return nullptr;
}
if (m_shutdown) {
#ifdef _WIN32
closesocket(sd);
#else
close(sd);
#endif
return nullptr;
}
return std::unique_ptr<NetworkStream>(new TCPStream(sd, &address));
}

View File

@@ -1,219 +0,0 @@
/*
TCPConnector.h
TCPConnector class definition. TCPConnector provides methods to actively
establish TCP/IP connections with a server.
------------------------------------------
Copyright (c) 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License
*/
#include "wpinet/TCPConnector.h"
#include <fcntl.h>
#include <cerrno>
#include <cstdio>
#include <cstring>
#ifdef _WIN32
#include <WS2tcpip.h>
#include <WinSock2.h>
#else
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <unistd.h>
#endif
#include <wpi/Logger.h>
#include <wpi/SmallString.h>
#include "wpinet/SocketError.h"
#include "wpinet/TCPStream.h"
using namespace wpi;
static int ResolveHostName(const char* hostname, struct in_addr* addr) {
struct addrinfo hints;
struct addrinfo* res;
hints.ai_flags = 0;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_addrlen = 0;
hints.ai_addr = nullptr;
hints.ai_canonname = nullptr;
hints.ai_next = nullptr;
int result = getaddrinfo(hostname, nullptr, &hints, &res);
if (result == 0) {
std::memcpy(
addr, &(reinterpret_cast<struct sockaddr_in*>(res->ai_addr)->sin_addr),
sizeof(struct in_addr));
freeaddrinfo(res);
}
return result;
}
std::unique_ptr<NetworkStream> TCPConnector::connect(const char* server,
int port, Logger& logger,
int timeout) {
#ifdef _WIN32
struct WSAHelper {
WSAHelper() {
WSAData wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);
}
~WSAHelper() { WSACleanup(); }
};
static WSAHelper helper;
#endif
struct sockaddr_in address;
std::memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
if (ResolveHostName(server, &(address.sin_addr)) != 0) {
#ifdef _WIN32
SmallString<128> addr_copy(server);
addr_copy.push_back('\0');
int res = InetPton(PF_INET, addr_copy.data(), &(address.sin_addr));
#else
int res = inet_pton(PF_INET, server, &(address.sin_addr));
#endif
if (res != 1) {
WPI_ERROR(logger, "could not resolve {} address", server);
return nullptr;
}
}
address.sin_port = htons(port);
if (timeout == 0) {
int sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0) {
WPI_ERROR(logger, "{}", "could not create socket");
return nullptr;
}
if (::connect(sd, reinterpret_cast<struct sockaddr*>(&address),
sizeof(address)) != 0) {
WPI_ERROR(logger, "connect() to {} port {} failed: {}", server, port,
SocketStrerror());
#ifdef _WIN32
closesocket(sd);
#else
::close(sd);
#endif
return nullptr;
}
return std::unique_ptr<NetworkStream>(new TCPStream(sd, &address));
}
fd_set sdset;
struct timeval tv;
socklen_t len;
int result = -1, valopt, sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0) {
WPI_ERROR(logger, "{}", "could not create socket");
return nullptr;
}
// Set socket to non-blocking
#ifdef _WIN32
u_long mode = 1;
if (ioctlsocket(sd, FIONBIO, &mode) == SOCKET_ERROR)
WPI_WARNING(logger, "could not set socket to non-blocking: {}",
SocketStrerror());
#else
int arg;
arg = fcntl(sd, F_GETFL, nullptr);
if (arg < 0) {
WPI_WARNING(logger, "could not set socket to non-blocking: {}",
SocketStrerror());
} else {
arg |= O_NONBLOCK;
if (fcntl(sd, F_SETFL, arg) < 0) {
WPI_WARNING(logger, "could not set socket to non-blocking: {}",
SocketStrerror());
}
}
#endif
// Connect with time limit
if ((result = ::connect(sd, reinterpret_cast<struct sockaddr*>(&address),
sizeof(address))) < 0) {
int my_errno = SocketErrno();
#ifdef _WIN32
if (my_errno == WSAEWOULDBLOCK || my_errno == WSAEINPROGRESS) {
#else
if (my_errno == EWOULDBLOCK || my_errno == EINPROGRESS) {
#endif
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&sdset);
FD_SET(sd, &sdset);
if (select(sd + 1, nullptr, &sdset, nullptr, &tv) > 0) {
len = sizeof(int);
getsockopt(sd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&valopt),
&len);
if (valopt) {
WPI_ERROR(logger, "select() to {} port {} error {} - {}", server,
port, valopt, SocketStrerror(valopt));
} else {
// connection established
result = 0;
}
} else {
WPI_INFO(logger, "connect() to {} port {} timed out", server, port);
}
} else {
WPI_ERROR(logger, "connect() to {} port {} error {} - {}", server, port,
SocketErrno(), SocketStrerror());
}
}
// Return socket to blocking mode
#ifdef _WIN32
mode = 0;
if (ioctlsocket(sd, FIONBIO, &mode) == SOCKET_ERROR)
WPI_WARNING(logger, "could not set socket to blocking: {}",
SocketStrerror());
#else
arg = fcntl(sd, F_GETFL, nullptr);
if (arg < 0) {
WPI_WARNING(logger, "could not set socket to blocking: {}",
SocketStrerror());
} else {
arg &= (~O_NONBLOCK);
if (fcntl(sd, F_SETFL, arg) < 0) {
WPI_WARNING(logger, "could not set socket to blocking: {}",
SocketStrerror());
}
}
#endif
// Create stream object if connected, close if not.
if (result == -1) {
#ifdef _WIN32
closesocket(sd);
#else
::close(sd);
#endif
return nullptr;
}
return std::unique_ptr<NetworkStream>(new TCPStream(sd, &address));
}

View File

@@ -1,131 +0,0 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "wpinet/TCPConnector.h" // NOLINT(build/include_order)
#include <atomic>
#include <chrono>
#include <thread>
#include <tuple>
#include <wpi/SmallSet.h>
#include <wpi/condition_variable.h>
#include <wpi/mutex.h>
using namespace wpi;
// MSVC < 1900 doesn't have support for thread_local
#if !defined(_MSC_VER) || _MSC_VER >= 1900
// clang check for availability of thread_local
#if !defined(__has_feature) || __has_feature(cxx_thread_local)
#define HAVE_THREAD_LOCAL
#endif
#endif
std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
span<const std::pair<const char*, int>> servers, Logger& logger,
int timeout) {
if (servers.empty()) {
return nullptr;
}
// structure to make sure we don't start duplicate workers
struct GlobalState {
wpi::mutex mtx;
#ifdef HAVE_THREAD_LOCAL
SmallSet<std::pair<std::string, int>, 16> active;
#else
SmallSet<std::tuple<std::thread::id, std::string, int>, 16> active;
#endif
};
#ifdef HAVE_THREAD_LOCAL
thread_local auto global = std::make_shared<GlobalState>();
#else
static auto global = std::make_shared<GlobalState>();
auto this_id = std::this_thread::get_id();
#endif
auto local = global; // copy to an automatic variable for lambda capture
// structure shared between threads and this function
struct Result {
wpi::mutex mtx;
wpi::condition_variable cv;
std::unique_ptr<NetworkStream> stream;
std::atomic<unsigned int> count{0};
std::atomic<bool> done{false};
};
auto result = std::make_shared<Result>();
// start worker threads; this is I/O bound so we don't limit to # of procs
Logger* plogger = &logger;
unsigned int num_workers = 0;
for (const auto& server : servers) {
std::pair<std::string, int> server_copy{std::string{server.first},
server.second};
#ifdef HAVE_THREAD_LOCAL
const auto& active_tracker = server_copy;
#else
std::tuple<std::thread::id, std::string, int> active_tracker{
this_id, server_copy.first, server_copy.second};
#endif
// don't start a new worker if we had a previously still-active connection
// attempt to the same server
{
std::scoped_lock lock(local->mtx);
if (local->active.count(active_tracker) > 0) {
continue; // already in set
}
}
++num_workers;
// start the worker
std::thread([=] {
if (!result->done) {
// add to global state
{
std::scoped_lock lock(local->mtx);
local->active.insert(active_tracker);
}
// try to connect
auto stream = connect(server_copy.first.c_str(), server_copy.second,
*plogger, timeout);
// remove from global state
{
std::scoped_lock lock(local->mtx);
local->active.erase(active_tracker);
}
// successful connection
if (stream) {
std::scoped_lock lock(result->mtx);
if (!result->done.exchange(true)) {
result->stream = std::move(stream);
}
}
}
++result->count;
result->cv.notify_all();
}).detach();
}
// wait for a result, timeout, or all finished
std::unique_lock lock(result->mtx);
if (timeout == 0) {
result->cv.wait(
lock, [&] { return result->stream || result->count >= num_workers; });
} else {
auto timeout_time =
std::chrono::steady_clock::now() + std::chrono::seconds(timeout);
result->cv.wait_until(lock, timeout_time, [&] {
return result->stream || result->count >= num_workers;
});
}
// no need to wait for remaining worker threads; shared_ptr will clean up
return std::move(result->stream);
}

View File

@@ -1,230 +0,0 @@
/*
TCPStream.h
TCPStream class definition. TCPStream provides methods to transfer
data between peers over a TCP/IP connection.
------------------------------------------
Copyright (c) 2013 [Vic Hargrave - http://vichargrave.com]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "wpinet/TCPStream.h"
#include <fcntl.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <unistd.h>
#endif
#include <cerrno>
using namespace wpi;
TCPStream::TCPStream(int sd, sockaddr_in* address)
: m_sd(sd), m_blocking(true) {
char ip[50];
#ifdef _WIN32
InetNtop(PF_INET, &(address->sin_addr.s_addr), ip, sizeof(ip) - 1);
#else
inet_ntop(PF_INET, reinterpret_cast<in_addr*>(&(address->sin_addr.s_addr)),
ip, sizeof(ip) - 1);
#ifdef SO_NOSIGPIPE
// disable SIGPIPE on Mac OS X
int set = 1;
setsockopt(m_sd, SOL_SOCKET, SO_NOSIGPIPE, reinterpret_cast<char*>(&set),
sizeof set);
#endif
#endif
m_peerIP = ip;
m_peerPort = ntohs(address->sin_port);
}
TCPStream::~TCPStream() {
close();
}
size_t TCPStream::send(const char* buffer, size_t len, Error* err) {
if (m_sd < 0) {
*err = kConnectionClosed;
return 0;
}
#ifdef _WIN32
WSABUF wsaBuf;
wsaBuf.buf = const_cast<char*>(buffer);
wsaBuf.len = (ULONG)len;
DWORD rv;
bool result = true;
while (WSASend(m_sd, &wsaBuf, 1, &rv, 0, nullptr, nullptr) == SOCKET_ERROR) {
if (WSAGetLastError() != WSAEWOULDBLOCK) {
result = false;
break;
}
if (!m_blocking) {
*err = kWouldBlock;
return 0;
}
Sleep(1);
}
if (!result) {
char Buffer[128];
#ifdef _MSC_VER
sprintf_s(Buffer, "Send() failed: WSA error=%d\n", WSAGetLastError());
#else
std::snprintf(Buffer, sizeof(Buffer), "Send() failed: WSA error=%d\n",
WSAGetLastError());
#endif
OutputDebugStringA(Buffer);
*err = kConnectionReset;
return 0;
}
#else
#ifdef MSG_NOSIGNAL
// disable SIGPIPE on Linux
ssize_t rv = ::send(m_sd, buffer, len, MSG_NOSIGNAL);
#else
ssize_t rv = ::send(m_sd, buffer, len, 0);
#endif
if (rv < 0) {
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) {
*err = kWouldBlock;
} else {
*err = kConnectionReset;
}
return 0;
}
#endif
return static_cast<size_t>(rv);
}
size_t TCPStream::receive(char* buffer, size_t len, Error* err, int timeout) {
if (m_sd < 0) {
*err = kConnectionClosed;
return 0;
}
#ifdef _WIN32
int rv;
#else
ssize_t rv;
#endif
if (timeout <= 0) {
#ifdef _WIN32
rv = recv(m_sd, buffer, len, 0);
#else
rv = read(m_sd, buffer, len);
#endif
} else if (WaitForReadEvent(timeout)) {
#ifdef _WIN32
rv = recv(m_sd, buffer, len, 0);
#else
rv = read(m_sd, buffer, len);
#endif
} else {
*err = kConnectionTimedOut;
return 0;
}
if (rv < 0) {
#ifdef _WIN32
if (!m_blocking && WSAGetLastError() == WSAEWOULDBLOCK) {
#else
if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) {
#endif
*err = kWouldBlock;
} else {
*err = kConnectionReset;
}
return 0;
}
return static_cast<size_t>(rv);
}
void TCPStream::close() {
if (m_sd >= 0) {
#ifdef _WIN32
::shutdown(m_sd, SD_BOTH);
closesocket(m_sd);
#else
::shutdown(m_sd, SHUT_RDWR);
::close(m_sd);
#endif
}
m_sd = -1;
}
std::string_view TCPStream::getPeerIP() const {
return m_peerIP;
}
int TCPStream::getPeerPort() const {
return m_peerPort;
}
void TCPStream::setNoDelay() {
if (m_sd < 0) {
return;
}
int optval = 1;
setsockopt(m_sd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&optval),
sizeof optval);
}
bool TCPStream::setBlocking(bool enabled) {
if (m_sd < 0) {
return true; // silently accept
}
#ifdef _WIN32
u_long mode = enabled ? 0 : 1;
if (ioctlsocket(m_sd, FIONBIO, &mode) == SOCKET_ERROR) {
return false;
}
#else
int flags = fcntl(m_sd, F_GETFL, nullptr);
if (flags < 0) {
return false;
}
if (enabled) {
flags &= ~O_NONBLOCK;
} else {
flags |= O_NONBLOCK;
}
if (fcntl(m_sd, F_SETFL, flags) < 0) {
return false;
}
#endif
return true;
}
int TCPStream::getNativeHandle() const {
return m_sd;
}
bool TCPStream::WaitForReadEvent(int timeout) {
fd_set sdset;
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&sdset);
FD_SET(m_sd, &sdset);
if (select(m_sd + 1, &sdset, nullptr, nullptr, &tv) > 0) {
return true;
}
return false;
}