@@ -49,20 +49,21 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
49
49
m_loop.log () << " Uncaught exception in daemonized task." ;
50
50
}
51
51
52
- EventLoopRef::EventLoopRef (EventLoop& loop, std::unique_lock<std::mutex> * lock) : m_loop(&loop), m_lock(lock)
52
+ EventLoopRef::EventLoopRef (EventLoop& loop, Lock * lock) : m_loop(&loop), m_lock(lock)
53
53
{
54
54
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex }};
55
55
m_loop->m_num_clients += 1 ;
56
56
}
57
57
58
- bool EventLoopRef::reset (std::unique_lock<std::mutex> * lock)
58
+ bool EventLoopRef::reset (Lock * lock)
59
59
{
60
60
bool done = false ;
61
61
if (m_loop) {
62
62
auto loop_lock{PtrOrValue{lock ? lock : m_lock, m_loop->m_mutex }};
63
+ loop_lock->assert_locked (m_loop->m_mutex );
63
64
assert (m_loop->m_num_clients > 0 );
64
65
m_loop->m_num_clients -= 1 ;
65
- if (m_loop->done (*loop_lock )) {
66
+ if (m_loop->done ()) {
66
67
done = true ;
67
68
m_loop->m_cv .notify_all ();
68
69
int post_fd{m_loop->m_post_fd };
@@ -133,18 +134,18 @@ Connection::~Connection()
133
134
m_sync_cleanup_fns.pop_front ();
134
135
}
135
136
while (!m_async_cleanup_fns.empty ()) {
136
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
137
+ const Lock lock (m_loop->m_mutex );
137
138
m_loop->m_async_fns .emplace_back (std::move (m_async_cleanup_fns.front ()));
138
139
m_async_cleanup_fns.pop_front ();
139
140
}
140
- std::unique_lock<std::mutex> lock (m_loop->m_mutex );
141
- m_loop->startAsyncThread (lock );
141
+ Lock lock (m_loop->m_mutex );
142
+ m_loop->startAsyncThread ();
142
143
m_loop.reset (&lock);
143
144
}
144
145
145
146
CleanupIt Connection::addSyncCleanup (std::function<void ()> fn)
146
147
{
147
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
148
+ const Lock lock (m_loop->m_mutex );
148
149
// Add cleanup callbacks to the front of list, so sync cleanup functions run
149
150
// in LIFO order. This is a good approach because sync cleanup functions are
150
151
// added as client objects are created, and it is natural to clean up
@@ -158,13 +159,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
158
159
159
160
void Connection::removeSyncCleanup (CleanupIt it)
160
161
{
161
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
162
+ const Lock lock (m_loop->m_mutex );
162
163
m_sync_cleanup_fns.erase (it);
163
164
}
164
165
165
166
void Connection::addAsyncCleanup (std::function<void ()> fn)
166
167
{
167
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
168
+ const Lock lock (m_loop->m_mutex );
168
169
// Add async cleanup callbacks to the back of the list. Unlike the sync
169
170
// cleanup list, this list order is more significant because it determines
170
171
// the order server objects are destroyed when there is a sudden disconnect,
@@ -200,7 +201,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
200
201
EventLoop::~EventLoop ()
201
202
{
202
203
if (m_async_thread.joinable ()) m_async_thread.join ();
203
- const std::lock_guard<std::mutex> lock (m_mutex);
204
+ const Lock lock (m_mutex);
204
205
KJ_ASSERT (m_post_fn == nullptr );
205
206
KJ_ASSERT (m_async_fns.empty ());
206
207
KJ_ASSERT (m_wait_fd == -1 );
@@ -225,12 +226,12 @@ void EventLoop::loop()
225
226
for (;;) {
226
227
const size_t read_bytes = wait_stream->read (&buffer, 0 , 1 ).wait (m_io_context.waitScope );
227
228
if (read_bytes != 1 ) throw std::logic_error (" EventLoop wait_stream closed unexpectedly" );
228
- std::unique_lock<std::mutex> lock (m_mutex);
229
+ Lock lock (m_mutex);
229
230
if (m_post_fn) {
230
231
Unlock (lock, *m_post_fn);
231
232
m_post_fn = nullptr ;
232
233
m_cv.notify_all ();
233
- } else if (done (lock )) {
234
+ } else if (done ()) {
234
235
// Intentionally do not break if m_post_fn was set, even if done()
235
236
// would return true, to ensure that the EventLoopRef write(post_fd)
236
237
// call always succeeds and the loop does not exit between the time
@@ -243,7 +244,7 @@ void EventLoop::loop()
243
244
log () << " EventLoop::loop bye." ;
244
245
wait_stream = nullptr ;
245
246
KJ_SYSCALL (::close (post_fd));
246
- const std::unique_lock<std::mutex> lock (m_mutex);
247
+ const Lock lock (m_mutex);
247
248
m_wait_fd = -1 ;
248
249
m_post_fd = -1 ;
249
250
}
@@ -254,26 +255,26 @@ void EventLoop::post(kj::Function<void()> fn)
254
255
fn ();
255
256
return ;
256
257
}
257
- std::unique_lock<std::mutex> lock (m_mutex);
258
+ Lock lock (m_mutex);
258
259
EventLoopRef ref (*this , &lock);
259
- m_cv.wait (lock, [this ] { return m_post_fn == nullptr ; });
260
+ m_cv.wait (lock. m_lock , [this ]() MP_REQUIRES (m_mutex) { return m_post_fn == nullptr ; });
260
261
m_post_fn = &fn;
261
262
int post_fd{m_post_fd};
262
263
Unlock (lock, [&] {
263
264
char buffer = 0 ;
264
265
KJ_SYSCALL (write (post_fd, &buffer, 1 ));
265
266
});
266
- m_cv.wait (lock, [this , &fn] { return m_post_fn != &fn; });
267
+ m_cv.wait (lock. m_lock , [this , &fn]() MP_REQUIRES (m_mutex) { return m_post_fn != &fn; });
267
268
}
268
269
269
- void EventLoop::startAsyncThread (std::unique_lock<std::mutex>& lock )
270
+ void EventLoop::startAsyncThread ()
270
271
{
271
272
if (m_async_thread.joinable ()) {
272
273
m_cv.notify_all ();
273
274
} else if (!m_async_fns.empty ()) {
274
275
m_async_thread = std::thread ([this ] {
275
- std::unique_lock<std::mutex> lock (m_mutex);
276
- while (!done (lock )) {
276
+ Lock lock (m_mutex);
277
+ while (!done ()) {
277
278
if (!m_async_fns.empty ()) {
278
279
EventLoopRef ref{*this , &lock};
279
280
const std::function<void ()> fn = std::move (m_async_fns.front ());
@@ -290,17 +291,15 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
290
291
// Continue without waiting in case there are more async_fns
291
292
continue ;
292
293
}
293
- m_cv.wait (lock);
294
+ m_cv.wait (lock. m_lock );
294
295
}
295
296
});
296
297
}
297
298
}
298
299
299
- bool EventLoop::done (std::unique_lock<std::mutex>& lock )
300
+ bool EventLoop::done ()
300
301
{
301
302
assert (m_num_clients >= 0 );
302
- assert (lock.owns_lock ());
303
- assert (lock.mutex () == &m_mutex);
304
303
return m_num_clients == 0 && m_async_fns.empty ();
305
304
}
306
305
0 commit comments