Skip to content

Commit 394f966

Browse files
committed
refactor: Add ProxyContext EventLoop* member
This commit makes mechanical changes needed to simplify an upcoming commit which replaces EventLoop* with an EventLoopRef. This change also happens to be also useful on its own so clientInvoke can detect disconnections in a non-racy way (bitcoin-core#123 (comment)) by seeing if the client Connection pointer is null while holding the event loop mutex.
1 parent c1aa2d7 commit 394f966

File tree

5 files changed

+23
-20
lines changed

5 files changed

+23
-20
lines changed

include/mp/proxy-io.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
443443
Sub::destroy(*this);
444444

445445
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
446-
m_context.connection->m_loop.sync([&]() {
446+
m_context.loop->sync([&]() {
447447
// Release client capability by move-assigning to temporary.
448448
{
449449
typename Interface::Client(std::move(m_client));

include/mp/proxy-types.h

+14-14
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ template <typename Client>
558558
void clientDestroy(Client& client)
559559
{
560560
if (client.m_context.connection) {
561-
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
561+
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
562562
} else {
563563
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
564564
}
@@ -567,7 +567,7 @@ void clientDestroy(Client& client)
567567
template <typename Server>
568568
void serverDestroy(Server& server)
569569
{
570-
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
570+
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
571571
}
572572

573573
//! Entry point called by generated client code that looks like:
@@ -587,7 +587,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
587587
}
588588
if (!g_thread_context.waiter) {
589589
assert(g_thread_context.thread_name.empty());
590-
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
590+
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
591591
// If next assert triggers, it means clientInvoke is being called from
592592
// the capnp event loop thread. This can happen when a ProxyServer
593593
// method implementation that runs synchronously on the event loop
@@ -598,26 +598,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
598598
// declaration so the server method runs in a dedicated thread.
599599
assert(!g_thread_context.loop_thread);
600600
g_thread_context.waiter = std::make_unique<Waiter>();
601-
proxy_client.m_context.connection->m_loop.logPlain()
601+
proxy_client.m_context.loop->logPlain()
602602
<< "{" << g_thread_context.thread_name
603603
<< "} IPC client first request from current thread, constructing waiter";
604604
}
605605
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
606606
std::exception_ptr exception;
607607
std::string kj_exception;
608608
bool done = false;
609-
proxy_client.m_context.connection->m_loop.sync([&]() {
609+
proxy_client.m_context.loop->sync([&]() {
610610
auto request = (proxy_client.m_client.*get_request)(nullptr);
611611
using Request = CapRequestTraits<decltype(request)>;
612612
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
613613
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
614-
proxy_client.m_context.connection->m_loop.logPlain()
614+
proxy_client.m_context.loop->logPlain()
615615
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
616616
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
617617

618-
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
618+
proxy_client.m_context.loop->m_task_set->add(request.send().then(
619619
[&](::capnp::Response<typename Request::Results>&& response) {
620-
proxy_client.m_context.connection->m_loop.logPlain()
620+
proxy_client.m_context.loop->logPlain()
621621
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
622622
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
623623
try {
@@ -632,7 +632,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
632632
},
633633
[&](const ::kj::Exception& e) {
634634
kj_exception = kj::str("kj::Exception: ", e).cStr();
635-
proxy_client.m_context.connection->m_loop.logPlain()
635+
proxy_client.m_context.loop->logPlain()
636636
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
637637
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
638638
done = true;
@@ -643,7 +643,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
643643
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
644644
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
645645
if (exception) std::rethrow_exception(exception);
646-
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
646+
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
647647
}
648648

649649
//! Invoke callable `fn()` that may return void. If it does return void, replace
@@ -682,7 +682,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
682682
using Results = typename decltype(call_context.getResults())::Builds;
683683

684684
int req = ++server_reqs;
685-
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
685+
server.m_context.loop->log() << "IPC server recv request #" << req << " "
686686
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
687687

688688
try {
@@ -699,14 +699,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
699699
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
700700
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
701701
.then([&server, req](CallContext call_context) {
702-
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
702+
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
703703
<< " " << LogEscape(call_context.getResults().toString());
704704
});
705705
} catch (const std::exception& e) {
706-
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
706+
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
707707
throw;
708708
} catch (...) {
709-
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
709+
server.m_context.loop->log() << "IPC server unhandled exception";
710710
throw;
711711
}
712712
}

include/mp/proxy.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ class EventLoopRef
7272
struct ProxyContext
7373
{
7474
Connection* connection;
75+
EventLoop* loop;
7576
CleanupList cleanup_fns;
7677

77-
ProxyContext(Connection* connection) : connection(connection) {}
78+
ProxyContext(Connection* connection);
7879
};
7980

8081
//! Base class for generated ProxyClient classes that implement a C++ interface

include/mp/type-context.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
132132
return;
133133
}
134134
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
135-
server.m_context.connection->m_loop.sync([&] {
135+
server.m_context.loop->sync([&] {
136136
auto fulfiller_dispose = kj::mv(fulfiller);
137137
fulfiller_dispose->fulfill(kj::mv(call_context));
138138
});
139139
}))
140140
{
141-
server.m_context.connection->m_loop.sync([&]() {
141+
server.m_context.loop->sync([&]() {
142142
auto fulfiller_dispose = kj::mv(fulfiller);
143143
fulfiller_dispose->reject(kj::mv(*exception));
144144
});
@@ -156,11 +156,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
156156
// thread.
157157
KJ_IF_MAYBE (thread_server, perhaps) {
158158
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
159-
server.m_context.connection->m_loop.log()
159+
server.m_context.loop->log()
160160
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
161161
thread.m_thread_context.waiter->post(std::move(invoke));
162162
} else {
163-
server.m_context.connection->m_loop.log()
163+
server.m_context.loop->log()
164164
<< "IPC server error request #" << req << ", missing thread to execute request";
165165
throw std::runtime_error("invalid thread handle");
166166
}

src/mp/proxy.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ bool EventLoopRef::reset(std::unique_lock<std::mutex>* lock)
6565
return done;
6666
}
6767

68+
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {}
69+
6870
Connection::~Connection()
6971
{
7072
// Shut down RPC system first, since this will garbage collect Server

0 commit comments

Comments
 (0)