Files
allwpilib/src/RpcServer.h
Peter Johnson fef8f933d9 Add SafeThread to fix thread JNI shutdown races.
During JVM shutdown, some JNI calls may not return, so it's not possible to
reliably perform a join() during static variable destruction (which occurs
as the JVM unloads the JNI module).

Also, due to static variable destruction, it's not safe to use any members
of a static class instance from a separate thread of execution.

SafeThread is a templated thread class and a related owner class that's
designed for safe operation and shutdown of threads in the presence of
callbacks that may not return.  It also passes ownership of variables from
the static instance to the thread, so the thread can safely operate until
it exits (the last operation of the thread being to destroy its instance).

Notifiers, RpcServer, and Logger now use SafeThread to ensure race-free
destruction in both C++ and Java.

All Java callback threads are now marked as Java daemon threads so they
don't keep the JVM running after main() terminates.

All Java callback threads are now named so their purpose is more easily
identified in a debugger.

Add SetRpcServerOnStart and SetRpcServerOnExit (similar to Listener).
2015-12-28 17:51:56 -08:00

91 lines
2.4 KiB
C++

/*----------------------------------------------------------------------------*/
/* Copyright (c) FIRST 2015. All Rights Reserved. */
/* Open Source Software - may be modified and shared by FRC teams. The code */
/* must be accompanied by the FIRST BSD license file in the root directory of */
/* the project. */
/*----------------------------------------------------------------------------*/
#ifndef NT_RPCSERVER_H_
#define NT_RPCSERVER_H_
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
#include "llvm/DenseMap.h"
#include "atomic_static.h"
#include "Message.h"
#include "ntcore_cpp.h"
#include "SafeThread.h"
namespace nt {
class RpcServer {
friend class RpcServerTest;
public:
static RpcServer& GetInstance() {
ATOMIC_STATIC(RpcServer, instance);
return instance;
}
~RpcServer();
typedef std::function<void(std::shared_ptr<Message>)> SendMsgFunc;
void Start();
void Stop();
void SetOnStart(std::function<void()> on_start) { m_on_start = on_start; }
void SetOnExit(std::function<void()> on_exit) { m_on_exit = on_exit; }
void ProcessRpc(StringRef name, std::shared_ptr<Message> msg,
RpcCallback func, unsigned int conn_id,
SendMsgFunc send_response);
bool PollRpc(bool blocking, RpcCallInfo* call_info);
void PostRpcResponse(unsigned int rpc_id, unsigned int call_uid,
llvm::StringRef result);
private:
RpcServer();
class Thread;
SafeThreadOwner<Thread> m_owner;
struct RpcCall {
RpcCall(StringRef name_, std::shared_ptr<Message> msg_, RpcCallback func_,
unsigned int conn_id_, SendMsgFunc send_response_)
: name(name_),
msg(msg_),
func(func_),
conn_id(conn_id_),
send_response(send_response_) {}
std::string name;
std::shared_ptr<Message> msg;
RpcCallback func;
unsigned int conn_id;
SendMsgFunc send_response;
};
std::mutex m_mutex;
std::queue<RpcCall> m_poll_queue;
llvm::DenseMap<std::pair<unsigned int, unsigned int>, SendMsgFunc>
m_response_map;
std::condition_variable m_poll_cond;
std::atomic_bool m_terminating;
std::function<void()> m_on_start;
std::function<void()> m_on_exit;
ATOMIC_STATIC_DECL(RpcServer)
};
} // namespace nt
#endif // NT_RPCSERVER_H_