@@ -53,165 +53,7 @@ struct StructField
53
53
// clang-format on
54
54
};
55
55
56
- template <typename Output>
57
- void CustomBuildField (TypeList<>,
58
- Priority<1 >,
59
- ClientInvokeContext& invoke_context,
60
- Output&& output,
61
- typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
62
- {
63
- auto & connection = invoke_context.connection ;
64
- auto & thread_context = invoke_context.thread_context ;
65
-
66
- // Create local Thread::Server object corresponding to the current thread
67
- // and pass a Thread::Client reference to it in the Context.callbackThread
68
- // field so the function being called can make callbacks to this thread.
69
- // Also store the Thread::Client reference in the callback_threads map so
70
- // future calls over this connection can reuse it.
71
- auto [callback_thread, _]{SetThread (
72
- thread_context.callback_threads , thread_context.waiter ->m_mutex , &connection,
73
- [&] { return connection.m_threads .add (kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
74
-
75
- // Call remote ThreadMap.makeThread function so server will create a
76
- // dedicated worker thread to run function calls from this thread. Store the
77
- // Thread::Client reference it returns in the request_threads map.
78
- auto make_request_thread{[&]{
79
- // This code will only run if an IPC client call is being made for the
80
- // first time on this thread. After the first call, subsequent calls
81
- // will use the existing request thread. This code will also never run at
82
- // all if the current thread is a request thread created for a different
83
- // IPC client, because in that case PassField code (below) will have set
84
- // request_thread to point to the calling thread.
85
- auto request = connection.m_thread_map .makeThreadRequest ();
86
- request.setName (thread_context.thread_name );
87
- return request.send ().getResult (); // Nonblocking due to capnp request pipelining.
88
- }};
89
- auto [request_thread, _1]{SetThread (
90
- thread_context.request_threads , thread_context.waiter ->m_mutex ,
91
- &connection, make_request_thread)};
92
56
93
- auto context = output.init ();
94
- context.setThread (request_thread->second .m_client );
95
- context.setCallbackThread (callback_thread->second .m_client );
96
- }
97
-
98
- // ! PassField override for mp.Context arguments. Return asynchronously and call
99
- // ! function on other thread found in context.
100
- template <typename Accessor, typename ServerContext, typename Fn, typename ... Args>
101
- auto PassField (Priority<1 >, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
102
- typename std::enable_if<
103
- std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
104
- kj::Promise<typename ServerContext::CallContext>>::type
105
- {
106
- const auto & params = server_context.call_context .getParams ();
107
- Context::Reader context_arg = Accessor::get (params);
108
- auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
109
- auto & server = server_context.proxy_server ;
110
- int req = server_context.req ;
111
- auto invoke = MakeAsyncCallable (
112
- [fulfiller = kj::mv (future.fulfiller ),
113
- call_context = kj::mv (server_context.call_context ), &server, req, fn, args...]() mutable {
114
- const auto & params = call_context.getParams ();
115
- Context::Reader context_arg = Accessor::get (params);
116
- ServerContext server_context{server, call_context, req};
117
- bool disconnected{false };
118
- {
119
- // Before invoking the function, store a reference to the
120
- // callbackThread provided by the client in the
121
- // thread_local.request_threads map. This way, if this
122
- // server thread needs to execute any RPCs that call back to
123
- // the client, they will happen on the same client thread
124
- // that is waiting for this function, just like what would
125
- // happen if this were a normal function call made on the
126
- // local stack.
127
- //
128
- // If the request_threads map already has an entry for this
129
- // connection, it will be left unchanged, and it indicates
130
- // that the current thread is an RPC client thread which is
131
- // in the middle of an RPC call, and the current RPC call is
132
- // a nested call from the remote thread handling that RPC
133
- // call. In this case, the callbackThread value should point
134
- // to the same thread already in the map, so there is no
135
- // need to update the map.
136
- auto & thread_context = g_thread_context;
137
- auto & request_threads = thread_context.request_threads ;
138
- auto [request_thread, inserted]{SetThread (
139
- request_threads, thread_context.waiter ->m_mutex ,
140
- server.m_context .connection ,
141
- [&] { return context_arg.getCallbackThread (); })};
142
-
143
- // If an entry was inserted into the requests_threads map,
144
- // remove it after calling fn.invoke. If an entry was not
145
- // inserted, one already existed, meaning this must be a
146
- // recursive call (IPC call calling back to the caller which
147
- // makes another IPC call), so avoid modifying the map.
148
- const bool erase_thread{inserted};
149
- KJ_DEFER ({
150
- std::unique_lock<std::mutex> lock (thread_context.waiter ->m_mutex );
151
- // Call erase here with a Connection* argument instead
152
- // of an iterator argument, because the `request_thread`
153
- // iterator may be invalid if the connection is closed
154
- // during this function call. More specifically, the
155
- // iterator may be invalid because SetThread adds a
156
- // cleanup callback to the Connection destructor that
157
- // erases the thread from the map, and also because the
158
- // ProxyServer<Thread> destructor calls
159
- // request_threads.clear().
160
- if (erase_thread) {
161
- disconnected = !request_threads.erase (server.m_context .connection );
162
- } else {
163
- disconnected = !request_threads.count (server.m_context .connection );
164
- }
165
- });
166
- fn.invoke (server_context, args...);
167
- }
168
- if (disconnected) {
169
- // If disconnected is true, the Connection object was
170
- // destroyed during the method call. Deal with this by
171
- // returning without ever fulfilling the promise, which will
172
- // cause the ProxyServer object to leak. This is not ideal,
173
- // but fixing the leak will require nontrivial code changes
174
- // because there is a lot of code assuming ProxyServer
175
- // objects are destroyed before Connection objects.
176
- return ;
177
- }
178
- KJ_IF_MAYBE (exception , kj::runCatchingExceptions ([&]() {
179
- server.m_context .connection ->m_loop .sync ([&] {
180
- auto fulfiller_dispose = kj::mv (fulfiller);
181
- fulfiller_dispose->fulfill (kj::mv (call_context));
182
- });
183
- }))
184
- {
185
- server.m_context .connection ->m_loop .sync ([&]() {
186
- auto fulfiller_dispose = kj::mv (fulfiller);
187
- fulfiller_dispose->reject (kj::mv (*exception ));
188
- });
189
- }
190
- });
191
-
192
- // Lookup Thread object specified by the client. The specified thread should
193
- // be a local Thread::Server object, but it needs to be looked up
194
- // asynchronously with getLocalServer().
195
- auto thread_client = context_arg.getThread ();
196
- return server.m_context .connection ->m_threads .getLocalServer (thread_client)
197
- .then ([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
198
- // Assuming the thread object is found, pass it a pointer to the
199
- // `invoke` lambda above which will invoke the function on that
200
- // thread.
201
- KJ_IF_MAYBE (thread_server, perhaps) {
202
- const auto & thread = static_cast <ProxyServer<Thread>&>(*thread_server);
203
- server.m_context .connection ->m_loop .log ()
204
- << " IPC server post request #" << req << " {" << thread.m_thread_context .thread_name << " }" ;
205
- thread.m_thread_context .waiter ->post (std::move (invoke));
206
- } else {
207
- server.m_context .connection ->m_loop .log ()
208
- << " IPC server error request #" << req << " , missing thread to execute request" ;
209
- throw std::runtime_error (" invalid thread handle" );
210
- }
211
- })
212
- // Wait for the invocation to finish before returning to the caller.
213
- .then ([invoke_wait = kj::mv (future.promise )]() mutable { return kj::mv (invoke_wait); });
214
- }
215
57
216
58
// Destination parameter type that can be passed to ReadField function as an
217
59
// alternative to ReadDestUpdate. It allows the ReadField implementation to call
0 commit comments