@@ -49,20 +49,22 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
49
49
m_loop.log () << " Uncaught exception in daemonized task." ;
50
50
}
51
51
52
- EventLoopRef::EventLoopRef (EventLoop& loop, std::unique_lock<std::mutex> * lock) : m_loop(&loop), m_lock(lock)
52
+ EventLoopRef::EventLoopRef (EventLoop& loop, Lock * lock) : m_loop(&loop), m_lock(lock)
53
53
{
54
54
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex }};
55
+ loop_lock->assert_locked (m_loop->m_mutex );
55
56
m_loop->m_num_clients += 1 ;
56
57
}
57
58
58
- bool EventLoopRef::reset (std::unique_lock<std::mutex> * lock)
59
+ bool EventLoopRef::reset (Lock * lock)
59
60
{
60
61
bool done = false ;
61
62
if (m_loop) {
62
63
auto loop_lock{PtrOrValue{lock ? lock : m_lock, m_loop->m_mutex }};
64
+ loop_lock->assert_locked (m_loop->m_mutex );
63
65
assert (m_loop->m_num_clients > 0 );
64
66
m_loop->m_num_clients -= 1 ;
65
- if (m_loop->done (*loop_lock )) {
67
+ if (m_loop->done ()) {
66
68
done = true ;
67
69
m_loop->m_cv .notify_all ();
68
70
int post_fd{m_loop->m_post_fd };
@@ -132,18 +134,18 @@ Connection::~Connection()
132
134
m_sync_cleanup_fns.pop_front ();
133
135
}
134
136
while (!m_async_cleanup_fns.empty ()) {
135
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
137
+ const Lock lock (m_loop->m_mutex );
136
138
m_loop->m_async_fns .emplace_back (std::move (m_async_cleanup_fns.front ()));
137
139
m_async_cleanup_fns.pop_front ();
138
140
}
139
- std::unique_lock<std::mutex> lock (m_loop->m_mutex );
140
- m_loop->startAsyncThread (lock );
141
+ Lock lock (m_loop->m_mutex );
142
+ m_loop->startAsyncThread ();
141
143
m_loop.reset (&lock);
142
144
}
143
145
144
146
CleanupIt Connection::addSyncCleanup (std::function<void ()> fn)
145
147
{
146
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
148
+ const Lock lock (m_loop->m_mutex );
147
149
// Add cleanup callbacks to the front of list, so sync cleanup functions run
148
150
// in LIFO order. This is a good approach because sync cleanup functions are
149
151
// added as client objects are created, and it is natural to clean up
@@ -157,13 +159,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
157
159
158
160
void Connection::removeSyncCleanup (CleanupIt it)
159
161
{
160
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
162
+ const Lock lock (m_loop->m_mutex );
161
163
m_sync_cleanup_fns.erase (it);
162
164
}
163
165
164
166
void Connection::addAsyncCleanup (std::function<void ()> fn)
165
167
{
166
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
168
+ const Lock lock (m_loop->m_mutex );
167
169
// Add async cleanup callbacks to the back of the list. Unlike the sync
168
170
// cleanup list, this list order is more significant because it determines
169
171
// the order server objects are destroyed when there is a sudden disconnect,
@@ -199,7 +201,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
199
201
EventLoop::~EventLoop ()
200
202
{
201
203
if (m_async_thread.joinable ()) m_async_thread.join ();
202
- const std::lock_guard<std::mutex> lock (m_mutex);
204
+ const Lock lock (m_mutex);
203
205
KJ_ASSERT (m_post_fn == nullptr );
204
206
KJ_ASSERT (m_async_fns.empty ());
205
207
KJ_ASSERT (m_wait_fd == -1 );
@@ -224,12 +226,12 @@ void EventLoop::loop()
224
226
for (;;) {
225
227
const size_t read_bytes = wait_stream->read (&buffer, 0 , 1 ).wait (m_io_context.waitScope );
226
228
if (read_bytes != 1 ) throw std::logic_error (" EventLoop wait_stream closed unexpectedly" );
227
- std::unique_lock<std::mutex> lock (m_mutex);
229
+ Lock lock (m_mutex);
228
230
if (m_post_fn) {
229
231
Unlock (lock, *m_post_fn);
230
232
m_post_fn = nullptr ;
231
233
m_cv.notify_all ();
232
- } else if (done (lock )) {
234
+ } else if (done ()) {
233
235
// Intentionally do not break if m_post_fn was set, even if done()
234
236
// would return true, to ensure that the EventLoopRef write(post_fd)
235
237
// call always succeeds and the loop does not exit between the time
@@ -242,7 +244,7 @@ void EventLoop::loop()
242
244
log () << " EventLoop::loop bye." ;
243
245
wait_stream = nullptr ;
244
246
KJ_SYSCALL (::close (post_fd));
245
- const std::unique_lock<std::mutex> lock (m_mutex);
247
+ const Lock lock (m_mutex);
246
248
m_wait_fd = -1 ;
247
249
m_post_fd = -1 ;
248
250
}
@@ -253,26 +255,26 @@ void EventLoop::post(kj::Function<void()> fn)
253
255
fn ();
254
256
return ;
255
257
}
256
- std::unique_lock<std::mutex> lock (m_mutex);
258
+ Lock lock (m_mutex);
257
259
EventLoopRef ref (*this , &lock);
258
- m_cv.wait (lock, [this ] { return m_post_fn == nullptr ; });
260
+ m_cv.wait (lock. m_lock , [this ]() MP_REQUIRES (m_mutex) { return m_post_fn == nullptr ; });
259
261
m_post_fn = &fn;
260
262
int post_fd{m_post_fd};
261
263
Unlock (lock, [&] {
262
264
char buffer = 0 ;
263
265
KJ_SYSCALL (write (post_fd, &buffer, 1 ));
264
266
});
265
- m_cv.wait (lock, [this , &fn] { return m_post_fn != &fn; });
267
+ m_cv.wait (lock. m_lock , [this , &fn]() MP_REQUIRES (m_mutex) { return m_post_fn != &fn; });
266
268
}
267
269
268
- void EventLoop::startAsyncThread (std::unique_lock<std::mutex>& lock )
270
+ void EventLoop::startAsyncThread ()
269
271
{
270
272
if (m_async_thread.joinable ()) {
271
273
m_cv.notify_all ();
272
274
} else if (!m_async_fns.empty ()) {
273
275
m_async_thread = std::thread ([this ] {
274
- std::unique_lock<std::mutex> lock (m_mutex);
275
- while (!done (lock )) {
276
+ Lock lock (m_mutex);
277
+ while (!done ()) {
276
278
if (!m_async_fns.empty ()) {
277
279
EventLoopRef ref{*this , &lock};
278
280
const std::function<void ()> fn = std::move (m_async_fns.front ());
@@ -289,17 +291,15 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
289
291
// Continue without waiting in case there are more async_fns
290
292
continue ;
291
293
}
292
- m_cv.wait (lock);
294
+ m_cv.wait (lock. m_lock );
293
295
}
294
296
});
295
297
}
296
298
}
297
299
298
- bool EventLoop::done (std::unique_lock<std::mutex>& lock )
300
+ bool EventLoop::done ()
299
301
{
300
302
assert (m_num_clients >= 0 );
301
- assert (lock.owns_lock ());
302
- assert (lock.mutex () == &m_mutex);
303
303
return m_num_clients == 0 && m_async_fns.empty ();
304
304
}
305
305
0 commit comments