[wpiutil] Add synchronization primitives

These enable more consistent use of synchronization across the
native libraries.  Users can create Event and Semaphore primitives, but
in addition, libraries can set up any handle as an Event-type signal.
This commit is contained in:
Peter Johnson
2021-08-03 00:05:47 -07:00
parent e32499c546
commit 87e34967ef
7 changed files with 1424 additions and 1 deletions

View File

@@ -0,0 +1,368 @@
// Copyright (c) FIRST and other WPILib contributors.
// Open Source Software; you can modify and/or share it under the terms of
// the WPILib BSD license file in the root directory of this project.
#include "wpi/Synchronization.h"
#include <algorithm>
#include <cstring>
#include <mutex>
#include "wpi/DenseMap.h"
#include "wpi/SmallVector.h"
#include "wpi/UidVector.h"
#include "wpi/condition_variable.h"
#include "wpi/mutex.h"
using namespace wpi;
namespace {
struct State {
int signaled{0};
bool autoReset{false};
wpi::SmallVector<wpi::condition_variable*, 2> waiters;
};
struct HandleManager {
wpi::mutex mutex;
wpi::UidVector<int, 8> eventIds;
wpi::UidVector<int, 8> semaphoreIds;
wpi::DenseMap<WPI_Handle, State> states;
};
} // namespace
static HandleManager& GetManager() {
static HandleManager manager;
return manager;
}
WPI_EventHandle wpi::CreateEvent(bool manualReset, bool initialState) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto index = manager.eventIds.emplace_back(0);
WPI_EventHandle handle = (kHandleTypeEvent << 24) | (index & 0xffffff);
// configure state data
auto& state = manager.states[handle];
state.signaled = initialState ? 1 : 0;
state.autoReset = !manualReset;
return handle;
}
void wpi::DestroyEvent(WPI_EventHandle handle) {
if ((handle >> 24) != kHandleTypeEvent) {
return;
}
DestroySignalObject(handle);
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
manager.eventIds.erase(handle & 0xffffff);
}
void wpi::SetEvent(WPI_EventHandle handle) {
if ((handle >> 24) != kHandleTypeEvent) {
return;
}
SetSignalObject(handle);
}
void wpi::ResetEvent(WPI_EventHandle handle) {
if ((handle >> 24) != kHandleTypeEvent) {
return;
}
ResetSignalObject(handle);
}
WPI_SemaphoreHandle wpi::CreateSemaphore(int initialCount, int maximumCount) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto index = manager.semaphoreIds.emplace_back(maximumCount);
WPI_EventHandle handle = (kHandleTypeSemaphore << 24) | (index & 0xffffff);
// configure state data
auto& state = manager.states[handle];
state.signaled = initialCount;
state.autoReset = true;
return handle;
}
void wpi::DestroySemaphore(WPI_SemaphoreHandle handle) {
if ((handle >> 24) != kHandleTypeSemaphore) {
return;
}
DestroySignalObject(handle);
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
manager.eventIds.erase(handle & 0xffffff);
}
bool wpi::ReleaseSemaphore(WPI_SemaphoreHandle handle, int releaseCount,
int* prevCount) {
if ((handle >> 24) != kHandleTypeSemaphore) {
return false;
}
if (releaseCount <= 0) {
return false;
}
int index = handle & 0xffffff;
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it == manager.states.end()) {
return false;
}
auto& state = it->second;
int maxCount = manager.eventIds[index];
if (prevCount) {
*prevCount = state.signaled;
}
if ((maxCount - state.signaled) < releaseCount) {
return false;
}
state.signaled += releaseCount;
for (auto& waiter : state.waiters) {
waiter->notify_all();
}
return true;
}
bool wpi::WaitForObject(WPI_Handle handle) {
return WaitForObject(handle, -1, nullptr);
}
bool wpi::WaitForObject(WPI_Handle handle, double timeout, bool* timedOut) {
WPI_Handle signaledValue;
auto signaled = WaitForObjects(
wpi::span(&handle, 1), wpi::span(&signaledValue, 1), timeout, timedOut);
if (signaled.empty()) {
return false;
}
return (signaled[0] & 0x80000000ul) == 0;
}
wpi::span<WPI_Handle> wpi::WaitForObjects(wpi::span<const WPI_Handle> handles,
wpi::span<WPI_Handle> signaled) {
return WaitForObjects(handles, signaled, -1, nullptr);
}
wpi::span<WPI_Handle> wpi::WaitForObjects(wpi::span<const WPI_Handle> handles,
wpi::span<WPI_Handle> signaled,
double timeout, bool* timedOut) {
auto& manager = GetManager();
std::unique_lock lock{manager.mutex};
wpi::condition_variable cv;
bool addedWaiters = false;
bool timedOutVal = false;
size_t count = 0;
for (;;) {
for (auto handle : handles) {
auto it = manager.states.find(handle);
if (it == manager.states.end()) {
if (count < signaled.size()) {
// treat a non-existent handle as signaled, but set the error bit
signaled[count++] = handle | 0x80000000ul;
}
} else {
auto& state = it->second;
if (state.signaled > 0) {
if (count < signaled.size()) {
signaled[count++] = handle;
}
if (state.autoReset) {
--state.signaled;
if (state.signaled < 0) {
state.signaled = 0;
}
}
}
}
}
if (timedOutVal || count != 0) {
break;
}
if (timeout == 0) {
timedOutVal = true;
break;
}
if (!addedWaiters) {
addedWaiters = true;
for (auto handle : handles) {
auto& state = manager.states[handle];
state.waiters.emplace_back(&cv);
}
}
if (timeout < 0) {
cv.wait(lock);
} else {
auto timeoutTime = std::chrono::steady_clock::now() +
std::chrono::duration<double>(timeout);
if (cv.wait_until(lock, timeoutTime) == std::cv_status::timeout) {
timedOutVal = true;
}
}
}
if (addedWaiters) {
for (auto handle : handles) {
auto& state = manager.states[handle];
auto it = std::find(state.waiters.begin(), state.waiters.end(), &cv);
if (it != state.waiters.end()) {
state.waiters.erase(it);
}
}
}
if (timedOut) {
*timedOut = timedOutVal;
}
return signaled.subspan(0, count);
}
void wpi::CreateSignalObject(WPI_Handle handle, bool manualReset,
bool initialState) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto& state = manager.states[handle];
state.signaled = initialState ? 1 : 0;
state.autoReset = !manualReset;
}
void wpi::SetSignalObject(WPI_Handle handle) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it == manager.states.end()) {
return;
}
auto& state = it->second;
state.signaled = 1;
for (auto& waiter : state.waiters) {
waiter->notify_all();
if (state.autoReset) {
// expect the first waiter to reset it
break;
}
}
}
void wpi::ResetSignalObject(WPI_Handle handle) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it != manager.states.end()) {
it->second.signaled = 0;
}
}
void wpi::DestroySignalObject(WPI_Handle handle) {
auto& manager = GetManager();
std::scoped_lock lock{manager.mutex};
auto it = manager.states.find(handle);
if (it != manager.states.end()) {
// wake up any waiters
for (auto& waiter : it->second.waiters) {
waiter->notify_all();
}
manager.states.erase(it);
}
}
extern "C" {
WPI_EventHandle WPI_CreateEvent(int manual_reset, int initial_state) {
return wpi::CreateEvent(manual_reset != 0, initial_state != 0);
}
void WPI_DestroyEvent(WPI_EventHandle handle) {
wpi::DestroyEvent(handle);
}
void WPI_SetEvent(WPI_EventHandle handle) {
wpi::SetEvent(handle);
}
void WPI_ResetEvent(WPI_EventHandle handle) {
wpi::ResetEvent(handle);
}
WPI_SemaphoreHandle WPI_CreateSemaphore(int initial_count, int maximum_count) {
return wpi::CreateSemaphore(initial_count, maximum_count);
}
void WPI_DestroySemaphore(WPI_SemaphoreHandle handle) {
wpi::DestroySemaphore(handle);
}
int WPI_ReleaseSemaphore(WPI_SemaphoreHandle handle, int release_count,
int* prev_count) {
return wpi::ReleaseSemaphore(handle, release_count, prev_count);
}
int WPI_WaitForObject(WPI_Handle handle) {
return wpi::WaitForObject(handle);
}
int WPI_WaitForObjectTimeout(WPI_Handle handle, double timeout,
int* timed_out) {
bool timedOutBool;
int rv = wpi::WaitForObject(handle, timeout, &timedOutBool);
*timed_out = timedOutBool ? 1 : 0;
return rv;
}
int WPI_WaitForObjects(const WPI_Handle* handles, int handles_count,
WPI_Handle* signaled) {
return wpi::WaitForObjects(wpi::span(handles, handles_count),
wpi::span(signaled, handles_count))
.size();
}
int WPI_WaitForObjectsTimeout(const WPI_Handle* handles, int handles_count,
WPI_Handle* signaled, double timeout,
int* timed_out) {
bool timedOutBool;
auto signaledResult = wpi::WaitForObjects(wpi::span(handles, handles_count),
wpi::span(signaled, handles_count),
timeout, &timedOutBool);
*timed_out = timedOutBool ? 1 : 0;
return signaledResult.size();
}
void WPI_CreateSignalObject(WPI_Handle handle, int manual_reset,
int initial_state) {
wpi::CreateSignalObject(handle, manual_reset, initial_state);
}
void WPI_SetSignalObject(WPI_Handle handle) {
wpi::SetSignalObject(handle);
}
void WPI_ResetSignalObject(WPI_Handle handle) {
wpi::ResetSignalObject(handle);
}
void WPI_DestroySignalObject(WPI_Handle handle) {
wpi::DestroySignalObject(handle);
}
} // extern "C"

View File

@@ -6,6 +6,7 @@
#include "edu_wpi_first_util_WPIUtilJNI.h"
#include "wpi/PortForwarder.h"
#include "wpi/Synchronization.h"
#include "wpi/jni_util.h"
#include "wpi/timestamp.h"
@@ -14,6 +15,8 @@ using namespace wpi::java;
static bool mockTimeEnabled = false;
static uint64_t mockNow = 0;
static JException interruptedEx;
extern "C" {
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM* vm, void* reserved) {
@@ -22,10 +25,21 @@ JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR;
}
interruptedEx = JException(env, "java/lang/InterruptedException");
if (!interruptedEx) {
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
JNIEXPORT void JNICALL JNI_OnUnload(JavaVM* vm, void* reserved) {}
JNIEXPORT void JNICALL JNI_OnUnload(JavaVM* vm, void* reserved) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6) != JNI_OK) {
return;
}
interruptedEx.free(env);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
@@ -94,4 +108,169 @@ Java_edu_wpi_first_util_WPIUtilJNI_removePortForwarder
wpi::PortForwarder::GetInstance().Remove(port);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: createEvent
* Signature: (ZZ)I
*/
JNIEXPORT jint JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_createEvent
(JNIEnv*, jclass, jboolean manualReset, jboolean initialState)
{
return wpi::CreateEvent(manualReset, initialState);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: destroyEvent
* Signature: (I)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_destroyEvent
(JNIEnv*, jclass, jint eventHandle)
{
wpi::DestroyEvent(eventHandle);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: setEvent
* Signature: (I)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_setEvent
(JNIEnv*, jclass, jint eventHandle)
{
wpi::SetEvent(eventHandle);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: resetEvent
* Signature: (I)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_resetEvent
(JNIEnv*, jclass, jint eventHandle)
{
wpi::ResetEvent(eventHandle);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: createSemaphore
* Signature: (II)I
*/
JNIEXPORT jint JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_createSemaphore
(JNIEnv*, jclass, jint initialCount, jint maximumCount)
{
return wpi::CreateSemaphore(initialCount, maximumCount);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: destroySemaphore
* Signature: (I)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_destroySemaphore
(JNIEnv*, jclass, jint semHandle)
{
wpi::DestroySemaphore(semHandle);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: releaseSemaphore
* Signature: (II)Z
*/
JNIEXPORT jboolean JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_releaseSemaphore
(JNIEnv*, jclass, jint semHandle, jint releaseCount)
{
return wpi::ReleaseSemaphore(semHandle, releaseCount);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: waitForObject
* Signature: (I)V
*/
JNIEXPORT void JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_waitForObject
(JNIEnv* env, jclass, jint handle)
{
if (!wpi::WaitForObject(handle)) {
interruptedEx.Throw(env, "WaitForObject interrupted");
}
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: waitForObjectTimeout
* Signature: (ID)Z
*/
JNIEXPORT jboolean JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_waitForObjectTimeout
(JNIEnv* env, jclass, jint handle, jdouble timeout)
{
bool timedOut;
if (!wpi::WaitForObject(handle, timeout, &timedOut) && !timedOut) {
interruptedEx.Throw(env, "WaitForObject interrupted");
return false;
}
return timedOut;
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: waitForObjects
* Signature: ([I)[I
*/
JNIEXPORT jintArray JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_waitForObjects
(JNIEnv* env, jclass, jintArray handles)
{
JIntArrayRef handlesArr{env, handles};
wpi::SmallVector<WPI_Handle, 8> signaledBuf;
signaledBuf.resize(handlesArr.size());
wpi::span<const WPI_Handle> handlesArr2{
reinterpret_cast<const WPI_Handle*>(handlesArr.array().data()),
handlesArr.size()};
auto signaled = wpi::WaitForObjects(handlesArr2, signaledBuf);
if (signaled.empty()) {
interruptedEx.Throw(env, "WaitForObjects interrupted");
return nullptr;
}
return MakeJIntArray(env, signaled);
}
/*
* Class: edu_wpi_first_util_WPIUtilJNI
* Method: waitForObjectsTimeout
* Signature: ([ID)[I
*/
JNIEXPORT jintArray JNICALL
Java_edu_wpi_first_util_WPIUtilJNI_waitForObjectsTimeout
(JNIEnv* env, jclass, jintArray handles, jdouble timeout)
{
JIntArrayRef handlesArr{env, handles};
wpi::SmallVector<WPI_Handle, 8> signaledBuf;
signaledBuf.resize(handlesArr.size());
wpi::span<const WPI_Handle> handlesArr2{
reinterpret_cast<const WPI_Handle*>(handlesArr.array().data()),
handlesArr.size()};
bool timedOut;
auto signaled =
wpi::WaitForObjects(handlesArr2, signaledBuf, timeout, &timedOut);
if (signaled.empty() && !timedOut) {
interruptedEx.Throw(env, "WaitForObjects interrupted");
return nullptr;
}
return MakeJIntArray(env, signaled);
}
} // extern "C"