From 32001427d40433d9cda1800a0740306d4523a460 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 20 Dec 2015 20:42:09 -0800 Subject: [PATCH] Java: call JNI AttachCurrentThread less frequently. Each call to AttachCurrentThread results in a new Java thread object being created. This is inefficient and also causes debugging issues with Eclipse due to constant creation and removal of threads. Now AttachCurrentThread is only called once for (all) listeners and once for logging (if used). --- include/ntcore_c.h | 3 + include/ntcore_cpp.h | 3 + java/lib/NetworkTablesJNI.cpp | 287 +++++++++++++++++++++++++--------- src/Notifier.cpp | 4 + src/Notifier.h | 6 + src/ntcore_c.cpp | 8 + src/ntcore_cpp.cpp | 8 + 7 files changed, 248 insertions(+), 71 deletions(-) diff --git a/include/ntcore_c.h b/include/ntcore_c.h index 8084235358..59c57d77d9 100644 --- a/include/ntcore_c.h +++ b/include/ntcore_c.h @@ -267,6 +267,9 @@ void NT_Flush(void); * Callback Creation Functions */ +void NT_SetListenerOnStart(void (*on_start)(void *data), void *data); +void NT_SetListenerOnExit(void (*on_exit)(void *data), void *data); + typedef void (*NT_EntryListenerCallback)( unsigned int uid, void *data, const char *name, size_t name_len, const struct NT_Value *value, unsigned int flags); diff --git a/include/ntcore_cpp.h b/include/ntcore_cpp.h index 634b00059a..5ebc4098de 100644 --- a/include/ntcore_cpp.h +++ b/include/ntcore_cpp.h @@ -180,6 +180,9 @@ void Flush(); * Callback Creation Functions */ +void SetListenerOnStart(std::function on_start); +void SetListenerOnExit(std::function on_exit); + typedef std::function value, unsigned int flags)> EntryListenerCallback; diff --git a/java/lib/NetworkTablesJNI.cpp b/java/lib/NetworkTablesJNI.cpp index ce40d0ca0f..7dbf42b29d 100644 --- a/java/lib/NetworkTablesJNI.cpp +++ b/java/lib/NetworkTablesJNI.cpp @@ -1,9 +1,15 @@ #include +#include #include +#include +#include #include +#include +#include #include "edu_wpi_first_wpilibj_networktables_NetworkTablesJNI.h" #include "ntcore.h" +#include "atomic_static.h" // // Globals and load/unload @@ -18,6 +24,24 @@ static jclass connectionInfoCls = nullptr; static jclass entryInfoCls = nullptr; static jclass keyNotDefinedEx = nullptr; static jclass persistentEx = nullptr; +// Thread-attached environment for listener callbacks. +static JNIEnv *listenerEnv = nullptr; + +static void ListenerOnStart() { + if (!jvm) return; + JNIEnv *env; + if (jvm->AttachCurrentThread(reinterpret_cast(&env), + nullptr) != JNI_OK) + return; + if (!env || !env->functions) return; + listenerEnv = env; +} + +static void ListenerOnExit() { + listenerEnv = nullptr; + if (!jvm) return; + jvm->DetachCurrentThread(); +} extern "C" { @@ -73,6 +97,10 @@ JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved) { if (!persistentEx) return JNI_ERR; env->DeleteLocalRef(local); + // Initial configuration of listener start/exit + nt::SetListenerOnStart(ListenerOnStart); + nt::SetListenerOnExit(ListenerOnExit); + return JNI_VERSION_1_6; } @@ -962,32 +990,27 @@ JNIEXPORT jint JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI JavaStringRef(envouter, prefix), [=](unsigned int uid, nt::StringRef name, std::shared_ptr value, unsigned int flags_) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; + JNIEnv *env = listenerEnv; if (!env || !env->functions) return; - { - // get the handler - auto handler = listener_global->obj(); + // get the handler + auto handler = listener_global->obj(); - // convert the value into the appropriate Java type - jobject jobj = ToJavaObject(env, *value); - if (!jobj) goto done; + // convert the value into the appropriate Java type + jobject jobj = ToJavaObject(env, *value); + if (!jobj) return; - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - goto done; - } - env->CallVoidMethod(handler, mid, (jint)uid, ToJavaString(env, name), - jobj, (jint)(flags_)); - if (env->ExceptionCheck()) env->ExceptionDescribe(); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return; + } + env->CallVoidMethod(handler, mid, (jint)uid, ToJavaString(env, name), + jobj, (jint)(flags_)); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); } -done: - jvm->DetachCurrentThread(); }, flags); } @@ -1027,33 +1050,28 @@ JNIEXPORT jint JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI return nt::AddConnectionListener( [=](unsigned int uid, bool connected, const nt::ConnectionInfo& conn) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; + JNIEnv *env = listenerEnv; if (!env || !env->functions) return; - { - // get the handler - auto handler = listener_global->obj(); - //if (!handler) goto done; // can happen due to weak reference + // get the handler + auto handler = listener_global->obj(); + //if (!handler) goto done; // can happen due to weak reference - // convert into the appropriate Java type - jobject jobj = ToJavaObject(env, conn); - if (!jobj) goto done; + // convert into the appropriate Java type + jobject jobj = ToJavaObject(env, conn); + if (!jobj) return; - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - goto done; - } - env->CallVoidMethod(handler, mid, (jint)uid, - (jboolean)(connected ? 1 : 0), jobj); - if (env->ExceptionCheck()) env->ExceptionDescribe(); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return; + } + env->CallVoidMethod(handler, mid, (jint)uid, + (jboolean)(connected ? 1 : 0), jobj); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); } -done: - jvm->DetachCurrentThread(); }, immediateNotify != JNI_FALSE); } @@ -1253,49 +1271,176 @@ JNIEXPORT jlong JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJN return nt::Now(); } +} // extern "C" + +// Thread where log callbacks are actually performed. +// +// JNI's AttachCurrentThread() creates a Java Thread object on every +// invocation, which is both time inefficient and causes issues with Eclipse +// (which tries to keep a thread list up-to-date and thus gets swamped). +// +// Instead, this class attaches just once. When a hardware notification +// occurs, a condition variable wakes up this thread and this thread actually +// makes the call into Java. +class LoggerThreadJNI { + public: + static LoggerThreadJNI& GetInstance() { + ATOMIC_STATIC(LoggerThreadJNI, instance); + return instance; + } + ~LoggerThreadJNI(); + void SetFunc(JNIEnv* env, jobject func, jmethodID mid); + void Start(); + void Stop(); + + void Log(unsigned int level, const char* file, unsigned int line, + const char* msg); + + private: + void ThreadMain(); + + std::thread m_thread; + std::mutex m_mutex; + std::condition_variable m_cond; + std::atomic_bool m_active{false}; + struct LogMessage { + LogMessage(unsigned int level_, const char* file_, unsigned int line_, + const char* msg_) + : level(level_), file(file_), line(line_), msg(msg_) {} + unsigned int level; + const char* file; + unsigned int line; + std::string msg; + }; + std::queue m_queue; + std::mutex m_shutdown_mutex; + std::condition_variable m_shutdown_cv; + bool m_shutdown = false; + jobject m_func = nullptr; + jmethodID m_mid; + + ATOMIC_STATIC_DECL(LoggerThreadJNI) +}; + +LoggerThreadJNI::~LoggerThreadJNI() { + Stop(); +} + +void LoggerThreadJNI::SetFunc(JNIEnv* env, jobject func, jmethodID mid) { + std::lock_guard lock(m_mutex); + // free global reference + if (m_func) env->DeleteGlobalRef(m_func); + // create global reference + m_func = env->NewGlobalRef(func); + m_mid = mid; +} + +void LoggerThreadJNI::Start() { + { + std::lock_guard lock(m_mutex); + if (m_active) return; + m_active = true; + } + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = false; + } + m_thread = std::thread(&LoggerThreadJNI::ThreadMain, this); +} + +void LoggerThreadJNI::Stop() { + { + std::lock_guard lock(m_mutex); + if (!m_active) return; + m_active = false; + } + m_cond.notify_one(); // wake up thread + + // join threads, with timeout + if (m_thread.joinable()) { + std::unique_lock lock(m_shutdown_mutex); + auto timeout_time = + std::chrono::steady_clock::now() + std::chrono::seconds(1); + if (m_shutdown_cv.wait_until(lock, timeout_time, + [&] { return m_shutdown; })) + m_thread.join(); + else + m_thread.detach(); // timed out, detach it + } +} + +void LoggerThreadJNI::Log(unsigned int level, const char *file, + unsigned int line, const char *msg) { + std::lock_guard lock(m_mutex); + if (!m_active) return; + m_queue.emplace(level, file, line, msg); + m_cond.notify_one(); +} + +void LoggerThreadJNI::ThreadMain() { + JNIEnv *env; + jint rs = jvm->AttachCurrentThread((void**)&env, NULL); + if (rs != JNI_OK) return; + + std::unique_lock lock(m_mutex); + while (m_active) { + m_cond.wait(lock, [&] { return !m_active || !m_queue.empty(); }); + if (!m_active) break; + while (!m_queue.empty()) { + if (!m_active) break; + auto item = std::move(m_queue.front()); + m_queue.pop(); + auto func = m_func; + auto mid = m_mid; + lock.unlock(); // don't hold mutex during callback execution + env->CallVoidMethod(func, mid, (jint)item.level, + ToJavaString(env, item.file), (jint)item.line, + ToJavaString(env, item.msg)); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + lock.lock(); + } + } + + if (jvm) jvm->DetachCurrentThread(); + + // use condition variable to signal thread shutdown + { + std::lock_guard lock(m_shutdown_mutex); + m_shutdown = true; + m_shutdown_cv.notify_one(); + } +} + +extern "C" { + /* * Class: edu_wpi_first_wpilibj_networktables_NetworkTablesJNI * Method: setLogger * Signature: (Ledu/wpi/first/wpilibj/networktables/NetworkTablesJNI/LoggerFunction;I)V */ JNIEXPORT void JNICALL Java_edu_wpi_first_wpilibj_networktables_NetworkTablesJNI_setLogger - (JNIEnv *envouter, jclass, jobject func, jint minLevel) + (JNIEnv *env, jclass, jobject func, jint minLevel) { - // the shared pointer to the global will keep it around until the - // a new logger is set - auto func_global = std::make_shared>(envouter, func); - // cls is a temporary here; cannot be used within callback functor - jclass cls = envouter->GetObjectClass(func); + jclass cls = env->GetObjectClass(func); if (!cls) return; // method ids, on the other hand, are safe to retain - jmethodID mid = envouter->GetMethodID( + jmethodID mid = env->GetMethodID( cls, "apply", "(ILjava/lang/String;ILjava/lang/String;)V"); if (!mid) return; - return nt::SetLogger( - [=](unsigned int level, const char *file, unsigned int line, - const char *msg) { - // need to attach as we're coming from a separate thread here - if (!jvm) return; - JNIEnv *env; - if (jvm->AttachCurrentThread(reinterpret_cast(&env), - nullptr) != JNI_OK) - return; - if (!env || !env->functions) return; + auto& thread = LoggerThreadJNI::GetInstance(); + thread.SetFunc(env, func, mid); + thread.Start(); - { - // get the handler - auto handler = func_global->obj(); - if (!handler) goto done; // shouldn't happen, but ignore if it does - - env->CallVoidMethod(handler, mid, (jint)level, ToJavaString(env, file), - (jint)line, ToJavaString(env, msg)); - if (env->ExceptionCheck()) env->ExceptionDescribe(); - } -done: - jvm->DetachCurrentThread(); + nt::SetLogger( + [](unsigned int level, const char *file, unsigned int line, + const char *msg) { + LoggerThreadJNI::GetInstance().Log(level, file, line, msg); }, minLevel); } diff --git a/src/Notifier.cpp b/src/Notifier.cpp index f1db1b659d..fb2e8dcbc7 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -54,6 +54,8 @@ void Notifier::Stop() { } void Notifier::ThreadMain() { + if (m_on_start) m_on_start(); + std::unique_lock lock(m_mutex); while (m_active) { while (m_entry_notifications.empty() && m_conn_notifications.empty()) { @@ -135,6 +137,8 @@ void Notifier::ThreadMain() { } done: + if (m_on_exit) m_on_exit(); + // use condition variable to signal thread shutdown { std::lock_guard lock(m_shutdown_mutex); diff --git a/src/Notifier.h b/src/Notifier.h index 487680cf58..d10054cf95 100644 --- a/src/Notifier.h +++ b/src/Notifier.h @@ -37,6 +37,9 @@ class Notifier { bool local_notifiers() const { return m_local_notifiers; } static bool destroyed() { return s_destroyed; } + void SetOnStart(std::function on_start) { m_on_start = on_start; } + void SetOnExit(std::function on_exit) { m_on_exit = on_exit; } + unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, unsigned int flags); @@ -105,6 +108,9 @@ class Notifier { std::condition_variable m_shutdown_cv; bool m_shutdown = false; + std::function m_on_start; + std::function m_on_exit; + ATOMIC_STATIC_DECL(Notifier) static bool s_destroyed; }; diff --git a/src/ntcore_c.cpp b/src/ntcore_c.cpp index cf16fc0c01..d7f545687f 100644 --- a/src/ntcore_c.cpp +++ b/src/ntcore_c.cpp @@ -167,6 +167,14 @@ void NT_Flush(void) { nt::Flush(); } * Callback Creation Functions */ +void NT_SetListenerOnStart(void (*on_start)(void *data), void *data) { + nt::SetListenerOnStart([=]() { on_start(data); }); +} + +void NT_SetListenerOnExit(void (*on_exit)(void *data), void *data) { + nt::SetListenerOnExit([=]() { on_exit(data); }); +} + unsigned int NT_AddEntryListener(const char *prefix, size_t prefix_len, void *data, NT_EntryListenerCallback callback, diff --git a/src/ntcore_cpp.cpp b/src/ntcore_cpp.cpp index 6e2c42b7d6..c50b3d4b64 100644 --- a/src/ntcore_cpp.cpp +++ b/src/ntcore_cpp.cpp @@ -65,6 +65,14 @@ void Flush() { * Callback Creation Functions */ +void SetListenerOnStart(std::function on_start) { + Notifier::GetInstance().SetOnStart(on_start); +} + +void SetListenerOnExit(std::function on_exit) { + Notifier::GetInstance().SetOnExit(on_exit); +} + unsigned int AddEntryListener(StringRef prefix, EntryListenerCallback callback, unsigned int flags) { Notifier& notifier = Notifier::GetInstance();