2017-08-03 16:45:42 -05:00
|
|
|
/*----------------------------------------------------------------------------*/
|
2019-07-07 19:17:14 -07:00
|
|
|
/* Copyright (c) 2017-2019 FIRST. All Rights Reserved. */
|
2017-08-03 16:45:42 -05:00
|
|
|
/* Open Source Software - may be modified and shared by FRC teams. The code */
|
|
|
|
|
/* must be accompanied by the FIRST BSD license file in the root directory of */
|
|
|
|
|
/* the project. */
|
|
|
|
|
/*----------------------------------------------------------------------------*/
|
|
|
|
|
|
2018-04-29 23:33:19 -07:00
|
|
|
#include "wpi/TCPConnector.h" // NOLINT(build/include_order)
|
2017-08-03 16:45:42 -05:00
|
|
|
|
|
|
|
|
#include <atomic>
|
|
|
|
|
#include <chrono>
|
|
|
|
|
#include <thread>
|
|
|
|
|
#include <tuple>
|
|
|
|
|
|
2018-04-29 23:33:19 -07:00
|
|
|
#include "wpi/SmallSet.h"
|
|
|
|
|
#include "wpi/condition_variable.h"
|
|
|
|
|
#include "wpi/mutex.h"
|
2017-08-03 16:45:42 -05:00
|
|
|
|
|
|
|
|
using namespace wpi;
|
|
|
|
|
|
|
|
|
|
// MSVC < 1900 doesn't have support for thread_local
|
|
|
|
|
#if !defined(_MSC_VER) || _MSC_VER >= 1900
|
|
|
|
|
// clang check for availability of thread_local
|
|
|
|
|
#if !defined(__has_feature) || __has_feature(cxx_thread_local)
|
|
|
|
|
#define HAVE_THREAD_LOCAL
|
|
|
|
|
#endif
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<NetworkStream> TCPConnector::connect_parallel(
|
2018-04-29 23:33:19 -07:00
|
|
|
ArrayRef<std::pair<const char*, int>> servers, Logger& logger,
|
2017-08-03 16:45:42 -05:00
|
|
|
int timeout) {
|
|
|
|
|
if (servers.empty()) return nullptr;
|
|
|
|
|
|
|
|
|
|
// structure to make sure we don't start duplicate workers
|
|
|
|
|
struct GlobalState {
|
2017-11-12 22:12:03 -08:00
|
|
|
wpi::mutex mtx;
|
2017-08-03 16:45:42 -05:00
|
|
|
#ifdef HAVE_THREAD_LOCAL
|
2018-04-29 23:33:19 -07:00
|
|
|
SmallSet<std::pair<std::string, int>, 16> active;
|
2017-08-03 16:45:42 -05:00
|
|
|
#else
|
2018-04-29 23:33:19 -07:00
|
|
|
SmallSet<std::tuple<std::thread::id, std::string, int>, 16> active;
|
2017-08-03 16:45:42 -05:00
|
|
|
#endif
|
|
|
|
|
};
|
|
|
|
|
#ifdef HAVE_THREAD_LOCAL
|
|
|
|
|
thread_local auto global = std::make_shared<GlobalState>();
|
|
|
|
|
#else
|
|
|
|
|
static auto global = std::make_shared<GlobalState>();
|
|
|
|
|
auto this_id = std::this_thread::get_id();
|
|
|
|
|
#endif
|
|
|
|
|
auto local = global; // copy to an automatic variable for lambda capture
|
|
|
|
|
|
|
|
|
|
// structure shared between threads and this function
|
|
|
|
|
struct Result {
|
2017-11-12 22:12:03 -08:00
|
|
|
wpi::mutex mtx;
|
|
|
|
|
wpi::condition_variable cv;
|
2017-08-03 16:45:42 -05:00
|
|
|
std::unique_ptr<NetworkStream> stream;
|
|
|
|
|
std::atomic<unsigned int> count{0};
|
|
|
|
|
std::atomic<bool> done{false};
|
|
|
|
|
};
|
|
|
|
|
auto result = std::make_shared<Result>();
|
|
|
|
|
|
|
|
|
|
// start worker threads; this is I/O bound so we don't limit to # of procs
|
|
|
|
|
Logger* plogger = &logger;
|
|
|
|
|
unsigned int num_workers = 0;
|
|
|
|
|
for (const auto& server : servers) {
|
|
|
|
|
std::pair<std::string, int> server_copy{std::string{server.first},
|
|
|
|
|
server.second};
|
|
|
|
|
#ifdef HAVE_THREAD_LOCAL
|
|
|
|
|
const auto& active_tracker = server_copy;
|
|
|
|
|
#else
|
|
|
|
|
std::tuple<std::thread::id, std::string, int> active_tracker{
|
|
|
|
|
this_id, server_copy.first, server_copy.second};
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
// don't start a new worker if we had a previously still-active connection
|
|
|
|
|
// attempt to the same server
|
|
|
|
|
{
|
2019-07-07 19:17:14 -07:00
|
|
|
std::lock_guard lock(local->mtx);
|
2017-08-03 16:45:42 -05:00
|
|
|
if (local->active.count(active_tracker) > 0) continue; // already in set
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
++num_workers;
|
|
|
|
|
|
|
|
|
|
// start the worker
|
|
|
|
|
std::thread([=]() {
|
|
|
|
|
if (!result->done) {
|
|
|
|
|
// add to global state
|
|
|
|
|
{
|
2019-07-07 19:17:14 -07:00
|
|
|
std::lock_guard lock(local->mtx);
|
2017-08-03 16:45:42 -05:00
|
|
|
local->active.insert(active_tracker);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// try to connect
|
|
|
|
|
auto stream = connect(server_copy.first.c_str(), server_copy.second,
|
|
|
|
|
*plogger, timeout);
|
|
|
|
|
|
|
|
|
|
// remove from global state
|
|
|
|
|
{
|
2019-07-07 19:17:14 -07:00
|
|
|
std::lock_guard lock(local->mtx);
|
2017-08-03 16:45:42 -05:00
|
|
|
local->active.erase(active_tracker);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// successful connection
|
|
|
|
|
if (stream) {
|
2019-07-07 19:17:14 -07:00
|
|
|
std::lock_guard lock(result->mtx);
|
2017-08-03 16:45:42 -05:00
|
|
|
if (!result->done.exchange(true)) result->stream = std::move(stream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
++result->count;
|
|
|
|
|
result->cv.notify_all();
|
2017-10-21 20:31:20 -07:00
|
|
|
})
|
|
|
|
|
.detach();
|
2017-08-03 16:45:42 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// wait for a result, timeout, or all finished
|
2019-07-07 19:17:14 -07:00
|
|
|
std::unique_lock lock(result->mtx);
|
2017-08-03 16:45:42 -05:00
|
|
|
if (timeout == 0) {
|
|
|
|
|
result->cv.wait(
|
|
|
|
|
lock, [&] { return result->stream || result->count >= num_workers; });
|
|
|
|
|
} else {
|
|
|
|
|
auto timeout_time =
|
|
|
|
|
std::chrono::steady_clock::now() + std::chrono::seconds(timeout);
|
|
|
|
|
result->cv.wait_until(lock, timeout_time, [&] {
|
|
|
|
|
return result->stream || result->count >= num_workers;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// no need to wait for remaining worker threads; shared_ptr will clean up
|
|
|
|
|
return std::move(result->stream);
|
|
|
|
|
}
|