mirror of
https://github.com/wpilibsuite/allwpilib
synced 2026-06-20 00:51:42 +00:00
129 lines
4.1 KiB
C++
129 lines
4.1 KiB
C++
|
|
/*----------------------------------------------------------------------------*/
|
||
|
|
/* Copyright (c) FIRST 2017. All Rights Reserved. */
|
||
|
|
/* Open Source Software - may be modified and shared by FRC teams. The code */
|
||
|
|
/* must be accompanied by the FIRST BSD license file in the root directory of */
|
||
|
|
/* the project. */
|
||
|
|
/*----------------------------------------------------------------------------*/
|
||
|
|
|
||
|
|
#include "tcpsockets/TCPConnector.h"
|
||
|
|
|
||
|
|
#include <atomic>
|
||
|
|
#include <chrono>
|
||
|
|
#include <condition_variable>
|
||
|
|
#include <mutex>
|
||
|
|
#include <thread>
|
||
|
|
#include <tuple>
|
||
|
|
|
||
|
|
#include "llvm/SmallSet.h"
|
||
|
|
|
||
|
|
using namespace wpi;
|
||
|
|
|
||
|
|
// MSVC < 1900 doesn't have support for thread_local
|
||
|
|
#if !defined(_MSC_VER) || _MSC_VER >= 1900
|
||
|
|
// clang check for availability of thread_local
|
||
|
|
#if !defined(__has_feature) || __has_feature(cxx_thread_local)
|
||
|
|
#define HAVE_THREAD_LOCAL
|
||
|
|
#endif
|
||
|
|
#endif
|
||
|
|
|
||
|
|
std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
|
||
|
|
llvm::ArrayRef<std::pair<const char*, int>> servers, Logger& logger,
|
||
|
|
int timeout) {
|
||
|
|
if (servers.empty()) return nullptr;
|
||
|
|
|
||
|
|
// structure to make sure we don't start duplicate workers
|
||
|
|
struct GlobalState {
|
||
|
|
std::mutex mutex;
|
||
|
|
#ifdef HAVE_THREAD_LOCAL
|
||
|
|
llvm::SmallSet<std::pair<std::string, int>, 16> active;
|
||
|
|
#else
|
||
|
|
llvm::SmallSet<std::tuple<std::thread::id, std::string, int>, 16> active;
|
||
|
|
#endif
|
||
|
|
};
|
||
|
|
#ifdef HAVE_THREAD_LOCAL
|
||
|
|
thread_local auto global = std::make_shared<GlobalState>();
|
||
|
|
#else
|
||
|
|
static auto global = std::make_shared<GlobalState>();
|
||
|
|
auto this_id = std::this_thread::get_id();
|
||
|
|
#endif
|
||
|
|
auto local = global; // copy to an automatic variable for lambda capture
|
||
|
|
|
||
|
|
// structure shared between threads and this function
|
||
|
|
struct Result {
|
||
|
|
std::mutex mutex;
|
||
|
|
std::condition_variable cv;
|
||
|
|
std::unique_ptr<NetworkStream> stream;
|
||
|
|
std::atomic<unsigned int> count{0};
|
||
|
|
std::atomic<bool> done{false};
|
||
|
|
};
|
||
|
|
auto result = std::make_shared<Result>();
|
||
|
|
|
||
|
|
// start worker threads; this is I/O bound so we don't limit to # of procs
|
||
|
|
Logger* plogger = &logger;
|
||
|
|
unsigned int num_workers = 0;
|
||
|
|
for (const auto& server : servers) {
|
||
|
|
std::pair<std::string, int> server_copy{std::string{server.first},
|
||
|
|
server.second};
|
||
|
|
#ifdef HAVE_THREAD_LOCAL
|
||
|
|
const auto& active_tracker = server_copy;
|
||
|
|
#else
|
||
|
|
std::tuple<std::thread::id, std::string, int> active_tracker{
|
||
|
|
this_id, server_copy.first, server_copy.second};
|
||
|
|
#endif
|
||
|
|
|
||
|
|
// don't start a new worker if we had a previously still-active connection
|
||
|
|
// attempt to the same server
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(local->mutex);
|
||
|
|
if (local->active.count(active_tracker) > 0) continue; // already in set
|
||
|
|
}
|
||
|
|
|
||
|
|
++num_workers;
|
||
|
|
|
||
|
|
// start the worker
|
||
|
|
std::thread([=]() {
|
||
|
|
if (!result->done) {
|
||
|
|
// add to global state
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(local->mutex);
|
||
|
|
local->active.insert(active_tracker);
|
||
|
|
}
|
||
|
|
|
||
|
|
// try to connect
|
||
|
|
auto stream = connect(server_copy.first.c_str(), server_copy.second,
|
||
|
|
*plogger, timeout);
|
||
|
|
|
||
|
|
// remove from global state
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(local->mutex);
|
||
|
|
local->active.erase(active_tracker);
|
||
|
|
}
|
||
|
|
|
||
|
|
// successful connection
|
||
|
|
if (stream) {
|
||
|
|
std::lock_guard<std::mutex> lock(result->mutex);
|
||
|
|
if (!result->done.exchange(true)) result->stream = std::move(stream);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
++result->count;
|
||
|
|
result->cv.notify_all();
|
||
|
|
}).detach();
|
||
|
|
}
|
||
|
|
|
||
|
|
// wait for a result, timeout, or all finished
|
||
|
|
std::unique_lock<std::mutex> lock(result->mutex);
|
||
|
|
if (timeout == 0) {
|
||
|
|
result->cv.wait(
|
||
|
|
lock, [&] { return result->stream || result->count >= num_workers; });
|
||
|
|
} else {
|
||
|
|
auto timeout_time =
|
||
|
|
std::chrono::steady_clock::now() + std::chrono::seconds(timeout);
|
||
|
|
result->cv.wait_until(lock, timeout_time, [&] {
|
||
|
|
return result->stream || result->count >= num_workers;
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
// no need to wait for remaining worker threads; shared_ptr will clean up
|
||
|
|
return std::move(result->stream);
|
||
|
|
}
|