[wpiutil] Synchronization: fix shutdown use-after-free (#8213)

Also in ntcore, join the notifier thread on shutdown.
This prevents tsan from reporting it as a leaked thread.
This commit is contained in:
Peter Johnson
2025-09-08 21:15:00 -07:00
committed by GitHub
parent 5cd97c6353
commit f8ed2a4d92
4 changed files with 123 additions and 32 deletions

View File

@@ -13,6 +13,7 @@ using namespace nt;
std::atomic<int> InstanceImpl::s_default{-1};
std::atomic<InstanceImpl*> InstanceImpl::s_instances[kNumInstances];
wpi::mutex InstanceImpl::s_mutex;
InstanceImpl::Cleanup InstanceImpl::s_cleanup;
using namespace std::placeholders;

View File

@@ -75,6 +75,19 @@ class InstanceImpl {
static std::atomic<InstanceImpl*> s_instances[kNumInstances];
static wpi::mutex s_mutex;
struct Cleanup {
~Cleanup() {
for (auto&& inst : s_instances) {
// don't actually destroy (due to undefined global destruction order),
// but stop all listener threads
if (auto i = inst.load()) {
i->listenerStorage.Reset();
}
}
}
};
static Cleanup s_cleanup;
wpi::mutex m_mutex;
std::shared_ptr<NetworkServer> m_networkServer;
std::shared_ptr<INetworkClient> m_networkClient;

View File

@@ -355,9 +355,7 @@ void ListenerStorage::Reset() {
m_valueListeners.clear();
m_logListeners.clear();
m_timeSyncListeners.clear();
if (m_thread) {
m_thread.Stop();
}
m_thread.Join();
}
std::vector<std::pair<NT_Listener, unsigned int>>

View File

@@ -17,7 +17,17 @@
using namespace wpi;
static std::atomic_bool gShutdown{false};
// Count of active threads using the handle manager singleton. A negative value
// indicates that the manager is being destroyed. When the manager is being
// destroyed, it first biases the count negative by adding INT_MIN / 2, and then
// waits for the count to return to INT_MIN / 2 before exiting. Any active
// threads will eventually decrement the count, and new threads will see the
// negative count and re-decrement it immediately; in either case the atomic
// will be notified so the destructor can continue running.
//
// This allows us both to detect in callers if destruction is in progress (value
// < 0) and also to detect when all threads have finished.
static std::atomic_int gActive{0};
namespace {
@@ -28,26 +38,71 @@ struct State {
};
struct HandleManager {
~HandleManager() { gShutdown = true; }
~HandleManager() {
gActive.fetch_add(INT_MIN / 2);
// wake up all waiters
{
std::scoped_lock lock{mutex};
for (auto&& [handle, state] : states) {
for (auto&& waiter : state.waiters) {
waiter->notify_all();
}
}
}
// wait for other threads to finish
//
// We don't use this logic on Windows, because Windows automatically kills
// all threads on process exit, so this actually can result in deadlocks
// because the other threads never decrement the count.
#ifndef _WIN32
for (;;) {
int nowActive = gActive.load();
if (nowActive == INT_MIN / 2) {
break;
}
// wait for active count to change
gActive.wait(nowActive);
}
#endif
}
wpi::mutex mutex;
wpi::UidVector<int, 8> eventIds;
wpi::UidVector<int, 8> semaphoreIds;
wpi::DenseMap<WPI_Handle, State> states;
};
class ManagerGuard {
public:
ManagerGuard()
: m_active{gActive.fetch_add(1, std::memory_order_acquire) >= 0} {}
~ManagerGuard() {
if (gActive.fetch_sub(1, std::memory_order_release) < 0) {
gActive.notify_all();
}
}
explicit operator bool() const { return m_active; }
HandleManager& GetManager() {
static HandleManager manager;
return manager;
}
private:
bool m_active;
};
} // namespace
static HandleManager& GetManager() {
static HandleManager manager;
return manager;
}
WPI_EventHandle wpi::CreateEvent(bool manualReset, bool initialState) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return {};
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto index = manager.eventIds.emplace_back(0);
@@ -57,7 +112,6 @@ WPI_EventHandle wpi::CreateEvent(bool manualReset, bool initialState) {
auto& state = manager.states[handle];
state.signaled = initialState ? 1 : 0;
state.autoReset = !manualReset;
return handle;
}
@@ -68,10 +122,11 @@ void wpi::DestroyEvent(WPI_EventHandle handle) {
DestroySignalObject(handle);
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
manager.eventIds.erase(handle & 0xffffff);
}
@@ -93,10 +148,11 @@ void wpi::ResetEvent(WPI_EventHandle handle) {
}
WPI_SemaphoreHandle wpi::CreateSemaphore(int initialCount, int maximumCount) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return {};
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto index = manager.semaphoreIds.emplace_back(maximumCount);
@@ -117,10 +173,11 @@ void wpi::DestroySemaphore(WPI_SemaphoreHandle handle) {
DestroySignalObject(handle);
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
manager.eventIds.erase(handle & 0xffffff);
}
@@ -135,10 +192,11 @@ bool wpi::ReleaseSemaphore(WPI_SemaphoreHandle handle, int releaseCount,
}
int index = handle & 0xffffff;
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return true;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it == manager.states.end()) {
@@ -181,11 +239,14 @@ std::span<WPI_Handle> wpi::WaitForObjects(std::span<const WPI_Handle> handles,
std::span<WPI_Handle> wpi::WaitForObjects(std::span<const WPI_Handle> handles,
std::span<WPI_Handle> signaled,
double timeout, bool* timedOut) {
auto& manager = GetManager();
if (gShutdown) {
*timedOut = false;
ManagerGuard guard;
if (!guard) {
if (timedOut) {
*timedOut = false;
}
return {};
}
auto& manager = guard.GetManager();
std::unique_lock lock{manager.mutex};
wpi::condition_variable cv;
bool addedWaiters = false;
@@ -233,6 +294,13 @@ std::span<WPI_Handle> wpi::WaitForObjects(std::span<const WPI_Handle> handles,
}
}
if (gActive.load(std::memory_order_acquire) < 0) {
// shutting down
timedOutVal = false;
count = 0;
break;
}
if (timeout < 0) {
cv.wait(lock);
} else {
@@ -242,6 +310,13 @@ std::span<WPI_Handle> wpi::WaitForObjects(std::span<const WPI_Handle> handles,
timedOutVal = true;
}
}
if (gActive.load(std::memory_order_acquire) < 0) {
// shutting down
timedOutVal = false;
count = 0;
break;
}
}
if (addedWaiters) {
@@ -263,10 +338,11 @@ std::span<WPI_Handle> wpi::WaitForObjects(std::span<const WPI_Handle> handles,
void wpi::CreateSignalObject(WPI_Handle handle, bool manualReset,
bool initialState) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto& state = manager.states[handle];
state.signaled = initialState ? 1 : 0;
@@ -274,10 +350,11 @@ void wpi::CreateSignalObject(WPI_Handle handle, bool manualReset,
}
void wpi::SetSignalObject(WPI_Handle handle) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it == manager.states.end()) {
@@ -295,10 +372,11 @@ void wpi::SetSignalObject(WPI_Handle handle) {
}
void wpi::ResetSignalObject(WPI_Handle handle) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it != manager.states.end()) {
@@ -307,10 +385,11 @@ void wpi::ResetSignalObject(WPI_Handle handle) {
}
void wpi::DestroySignalObject(WPI_Handle handle) {
auto& manager = GetManager();
if (gShutdown) {
ManagerGuard guard;
if (!guard) {
return;
}
auto& manager = guard.GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);