// Copyright (c) FIRST and other WPILib contributors. // Open Source Software; you can modify and/or share it under the terms of // the WPILib BSD license file in the root directory of this project. #include "wpi/TCPConnector.h" // NOLINT(build/include_order) #include #include #include #include #include "wpi/SmallSet.h" #include "wpi/condition_variable.h" #include "wpi/mutex.h" using namespace wpi; // MSVC < 1900 doesn't have support for thread_local #if !defined(_MSC_VER) || _MSC_VER >= 1900 // clang check for availability of thread_local #if !defined(__has_feature) || __has_feature(cxx_thread_local) #define HAVE_THREAD_LOCAL #endif #endif std::unique_ptr TCPConnector::connect_parallel( span> servers, Logger& logger, int timeout) { if (servers.empty()) { return nullptr; } // structure to make sure we don't start duplicate workers struct GlobalState { wpi::mutex mtx; #ifdef HAVE_THREAD_LOCAL SmallSet, 16> active; #else SmallSet, 16> active; #endif }; #ifdef HAVE_THREAD_LOCAL thread_local auto global = std::make_shared(); #else static auto global = std::make_shared(); auto this_id = std::this_thread::get_id(); #endif auto local = global; // copy to an automatic variable for lambda capture // structure shared between threads and this function struct Result { wpi::mutex mtx; wpi::condition_variable cv; std::unique_ptr stream; std::atomic count{0}; std::atomic done{false}; }; auto result = std::make_shared(); // start worker threads; this is I/O bound so we don't limit to # of procs Logger* plogger = &logger; unsigned int num_workers = 0; for (const auto& server : servers) { std::pair server_copy{std::string{server.first}, server.second}; #ifdef HAVE_THREAD_LOCAL const auto& active_tracker = server_copy; #else std::tuple active_tracker{ this_id, server_copy.first, server_copy.second}; #endif // don't start a new worker if we had a previously still-active connection // attempt to the same server { std::scoped_lock lock(local->mtx); if (local->active.count(active_tracker) > 0) { continue; // already in set } } ++num_workers; // start the worker std::thread([=] { if (!result->done) { // add to global state { std::scoped_lock lock(local->mtx); local->active.insert(active_tracker); } // try to connect auto stream = connect(server_copy.first.c_str(), server_copy.second, *plogger, timeout); // remove from global state { std::scoped_lock lock(local->mtx); local->active.erase(active_tracker); } // successful connection if (stream) { std::scoped_lock lock(result->mtx); if (!result->done.exchange(true)) { result->stream = std::move(stream); } } } ++result->count; result->cv.notify_all(); }).detach(); } // wait for a result, timeout, or all finished std::unique_lock lock(result->mtx); if (timeout == 0) { result->cv.wait( lock, [&] { return result->stream || result->count >= num_workers; }); } else { auto timeout_time = std::chrono::steady_clock::now() + std::chrono::seconds(timeout); result->cv.wait_until(lock, timeout_time, [&] { return result->stream || result->count >= num_workers; }); } // no need to wait for remaining worker threads; shared_ptr will clean up return std::move(result->stream); }