diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 8084235358..59c57d77d9 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -267,6 +267,9 @@ void NT_Flush(void); * Callback Creation Functions */ +void NT_SetListenerOnStart(void (*on_start)(void *data), void *data); +void NT_SetListenerOnExit(void (*on_exit)(void *data), void *data); + typedef void (*NT_EntryListenerCallback)( unsigned int uid, void *data, const char *name, size_t name_len, const struct NT_Value *value, unsigned int flags); diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 634b00059a..5ebc4098de 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -180,6 +180,9 @@ void Flush(); * Callback Creation Functions */ +void SetListenerOnStart(std::function on_start); +void SetListenerOnExit(std::function on_exit); + typedef std::function value, unsigned int flags)> EntryListenerCallback; diff --git a/java/lib/NetworkTablesJNI.cpp b/java/lib/NetworkTablesJNI.cpp index ce40d0ca0f..7dbf42b29d 100644 --- a/java/lib/NetworkTablesJNI.cpp +++ b/java/lib/NetworkTablesJNI.cpp @@ -1,9 +1,15 @@ #include +#include #include +#include +#include #include +#include +#include #include "edu_wpi_first_wpilibj_networktables_NetworkTablesJNI.h" #include "ntcore.h" +#include "atomic_static.h" // // Globals and load/unload @@ -18,6 +24,24 @@ static jclass connectionInfoCls = nullptr; static jclass entryInfoCls = nullptr; static jclass keyNotDefinedEx = nullptr; static jclass persistentEx = nullptr; +// Thread-attached environment for listener callbacks. +static JNIEnv *listenerEnv = nullptr; + +static void ListenerOnStart() { + if (!jvm) return; + JNIEnv *env; + if (jvm->AttachCurrentThread(reinterpret_cast(&env), + nullptr) != JNI_OK) + return; + if (!env || !env->functions) return; + listenerEnv = env; +} + +static void ListenerOnExit() { + listenerEnv = nullptr; + if (!jvm) return; + jvm->DetachCurrentThread(); +} extern "C" { @@ -73,6 +97,10 @@ JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved) { if (!persistentEx) return JNI_ERR; env->DeleteLocalRef(local); + // Initial configuration of listener start/exit + nt::SetListenerOnStart(ListenerOnStart); + nt::SetListenerOnExit(ListenerOnExit); + return JNI_VERSION_1_6; } @@ -962,32 +990,27 @@ JNIEXPORT jint JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI JavaStringRef(envouter, prefix), [=](unsigned int uid, nt::StringRef name, std::shared_ptr value, unsigned int flags_) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; + JNIEnv *env = listenerEnv; if (!env || !env->functions) return; - { - // get the handler - auto handler = listener_global->obj(); + // get the handler + auto handler = listener_global->obj(); - // convert the value into the appropriate Java type - jobject jobj = ToJavaObject(env, *value); - if (!jobj) goto done; + // convert the value into the appropriate Java type + jobject jobj = ToJavaObject(env, *value); + if (!jobj) return; - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - goto done; - } - env->CallVoidMethod(handler, mid, (jint)uid, ToJavaString(env, name), - jobj, (jint)(flags_)); - if (env->ExceptionCheck()) env->ExceptionDescribe(); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return; + } + env->CallVoidMethod(handler, mid, (jint)uid, ToJavaString(env, name), + jobj, (jint)(flags_)); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); } -done: - jvm->DetachCurrentThread(); }, flags); } @@ -1027,33 +1050,28 @@ JNIEXPORT jint JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI return nt::AddConnectionListener( [=](unsigned int uid, bool connected, const nt::ConnectionInfo& conn) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; + JNIEnv *env = listenerEnv; if (!env || !env->functions) return; - { - // get the handler - auto handler = listener_global->obj(); - //if (!handler) goto done; // can happen due to weak reference + // get the handler + auto handler = listener_global->obj(); + //if (!handler) goto done; // can happen due to weak reference - // convert into the appropriate Java type - jobject jobj = ToJavaObject(env, conn); - if (!jobj) goto done; + // convert into the appropriate Java type + jobject jobj = ToJavaObject(env, conn); + if (!jobj) return; - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - goto done; - } - env->CallVoidMethod(handler, mid, (jint)uid, - (jboolean)(connected ? 1 : 0), jobj); - if (env->ExceptionCheck()) env->ExceptionDescribe(); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return; + } + env->CallVoidMethod(handler, mid, (jint)uid, + (jboolean)(connected ? 1 : 0), jobj); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); } -done: - jvm->DetachCurrentThread(); }, immediateNotify != JNI_FALSE); } @@ -1253,49 +1271,176 @@ JNIEXPORT jlong JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJN return nt::Now(); } +} // extern "C" + +// Thread where log callbacks are actually performed. +// +// JNI's AttachCurrentThread() creates a Java Thread object on every +// invocation, which is both time inefficient and causes issues with Eclipse +// (which tries to keep a thread list up-to-date and thus gets swamped). +// +// Instead, this class attaches just once. When a hardware notification +// occurs, a condition variable wakes up this thread and this thread actually +// makes the call into Java. +class LoggerThreadJNI { + public: + static LoggerThreadJNI& GetInstance() { + ATOMIC_STATIC(LoggerThreadJNI, instance); + return instance; + } + ~LoggerThreadJNI(); + void SetFunc(JNIEnv* env, jobject func, jmethodID mid); + void Start(); + void Stop(); + + void Log(unsigned int level, const char* file, unsigned int line, + const char* msg); + + private: + void ThreadMain(); + + std::thread m_thread; + std::mutex m_mutex; + std::condition_variable m_cond; + std::atomic_bool m_active{false}; + struct LogMessage { + LogMessage(unsigned int level_, const char* file_, unsigned int line_, + const char* msg_) + : level(level_), file(file_), line(line_), msg(msg_) {} + unsigned int level; + const char* file; + unsigned int line; + std::string msg; + }; + std::queue m_queue; + std::mutex m_shutdown_mutex; + std::condition_variable m_shutdown_cv; + bool m_shutdown = false; + jobject m_func = nullptr; + jmethodID m_mid; + + ATOMIC_STATIC_DECL(LoggerThreadJNI) +}; + +LoggerThreadJNI::~LoggerThreadJNI() { + Stop(); +} + +void LoggerThreadJNI::SetFunc(JNIEnv* env, jobject func, jmethodID mid) { + std::lock_guard lock(m_mutex); + // free global reference + if (m_func) env->DeleteGlobalRef(m_func); + // create global reference + m_func = env->NewGlobalRef(func); + m_mid = mid; +} + +void LoggerThreadJNI::Start() { + { + std::lock_guard lock(m_mutex); + if (m_active) return; + m_active = true; + } + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = false; + } + m_thread = std::thread(&LoggerThreadJNI::ThreadMain, this); +} + +void LoggerThreadJNI::Stop() { + { + std::lock_guard lock(m_mutex); + if (!m_active) return; + m_active = false; + } + m_cond.notify_one(); // wake up thread + + // join threads, with timeout + if (m_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_shutdown; })) + m_thread.join(); + else + m_thread.detach(); // timed out, detach it + } +} + +void LoggerThreadJNI::Log(unsigned int level, const char *file, + unsigned int line, const char *msg) { + std::lock_guard lock(m_mutex); + if (!m_active) return; + m_queue.emplace(level, file, line, msg); + m_cond.notify_one(); +} + +void LoggerThreadJNI::ThreadMain() { + JNIEnv *env; + jint rs = jvm->AttachCurrentThread((void**)&env, NULL); + if (rs != JNI_OK) return; + + std::unique_lock lock(m_mutex); + while (m_active) { + m_cond.wait(lock, [&] { return !m_active || !m_queue.empty(); }); + if (!m_active) break; + while (!m_queue.empty()) { + if (!m_active) break; + auto item = std::move(m_queue.front()); + m_queue.pop(); + auto func = m_func; + auto mid = m_mid; + lock.unlock(); // don't hold mutex during callback execution + env->CallVoidMethod(func, mid, (jint)item.level, + ToJavaString(env, item.file), (jint)item.line, + ToJavaString(env, item.msg)); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + lock.lock(); + } + } + + if (jvm) jvm->DetachCurrentThread(); + + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = true; + m_shutdown_cv.notify_one(); + } +} + +extern "C" { + /* * Class: edu_wpi_first_wpilibj_networktables_NetworkTablesJNI * Method: setLogger * Signature: (Ledu/wpi/first/wpilibj/networktables/NetworkTablesJNI/LoggerFunction;I)V */ JNIEXPORT void JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI_setLogger - (JNIEnv *envouter, jclass, jobject func, jint minLevel) + (JNIEnv *env, jclass, jobject func, jint minLevel) { - // the shared pointer to the global will keep it around until the - // a new logger is set - auto func_global = std::make_shared>(envouter, func); - // cls is a temporary here; cannot be used within callback functor - jclass cls = envouter->GetObjectClass(func); + jclass cls = env->GetObjectClass(func); if (!cls) return; // method ids, on the other hand, are safe to retain - jmethodID mid = envouter->GetMethodID( + jmethodID mid = env->GetMethodID( cls, "apply", "(ILjava/lang/String;ILjava/lang/String;)V"); if (!mid) return; - return nt::SetLogger( - [=](unsigned int level, const char *file, unsigned int line, - const char *msg) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; - if (!env || !env->functions) return; + auto& thread = LoggerThreadJNI::GetInstance(); + thread.SetFunc(env, func, mid); + thread.Start(); - { - // get the handler - auto handler = func_global->obj(); - if (!handler) goto done; // shouldn't happen, but ignore if it does - - env->CallVoidMethod(handler, mid, (jint)level, ToJavaString(env, file), - (jint)line, ToJavaString(env, msg)); - if (env->ExceptionCheck()) env->ExceptionDescribe(); - } -done: - jvm->DetachCurrentThread(); + nt::SetLogger( + [](unsigned int level, const char *file, unsigned int line, + const char *msg) { + LoggerThreadJNI::GetInstance().Log(level, file, line, msg); }, minLevel); } diff --git a/src/Notifier.cpp b/src/Notifier.cpp index f1db1b659d..fb2e8dcbc7 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -54,6 +54,8 @@ void Notifier::Stop() { } void Notifier::ThreadMain() { + if (m_on_start) m_on_start(); + std::unique_lock lock(m_mutex); while (m_active) { while (m_entry_notifications.empty() && m_conn_notifications.empty()) { @@ -135,6 +137,8 @@ void Notifier::ThreadMain() { } done: + if (m_on_exit) m_on_exit(); + // use condition variable to signal thread shutdown { std::lock_guard lock(m_shutdown_mutex); diff --git a/src/Notifier.h b/src/Notifier.h index 487680cf58..d10054cf95 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -37,6 +37,9 @@ class Notifier { bool local_notifiers() const { return m_local_notifiers; } static bool destroyed() { return s_destroyed; } + void SetOnStart(std::function on_start) { m_on_start = on_start; } + void SetOnExit(std::function on_exit) { m_on_exit = on_exit; } + unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, unsigned int flags); @@ -105,6 +108,9 @@ class Notifier { std::condition_variable m_shutdown_cv; bool m_shutdown = false; + std::function m_on_start; + std::function m_on_exit; + ATOMIC_STATIC_DECL(Notifier) static bool s_destroyed; }; diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index cf16fc0c01..d7f545687f 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -167,6 +167,14 @@ void NT_Flush(void) { nt::Flush(); } * Callback Creation Functions */ +void NT_SetListenerOnStart(void (*on_start)(void *data), void *data) { + nt::SetListenerOnStart([=]() { on_start(data); }); +} + +void NT_SetListenerOnExit(void (*on_exit)(void *data), void *data) { + nt::SetListenerOnExit([=]() { on_exit(data); }); +} + unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 6e2c42b7d6..c50b3d4b64 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -65,6 +65,14 @@ void Flush() { * Callback Creation Functions */ +void SetListenerOnStart(std::function on_start) { + Notifier::GetInstance().SetOnStart(on_start); +} + +void SetListenerOnExit(std::function on_exit) { + Notifier::GetInstance().SetOnExit(on_exit); +} + unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, unsigned int flags) { Notifier& notifier = Notifier::GetInstance();