Skip to content

Commit 0014175

Browse files
authored
Fix a rare pthreads main thread deadlock that worsened in 2.0.2 (#12318)
The deadlock was introduced in #12055 which was released in 2.0.2. However, the issue is deeper as the deadlock was just made more frequent by that. Background: Our pthreads implementation has some extra complexity because of how the main thread works on the web, specifically, we can't block there. So we can't do Atomics.wait there. Instead, we do a busy-wait in effect, and to make that as least bad as possible, we special case the main thread. There is a memory address, the "main thread futex address", which indicates the futex the main thread is waiting on, or 0 if none. When a thread runs futex_wake, it first checks if the main thread is waiting on that futex by checking that address, and if it is, we notify it first before any workers. In effect this gives the main thread priority. And by using that address, the main thread can busy-wait on it, just polling to see when a pthread sets it to 0, which indicates the wait can stop. This broke in #12055 because it erroneously ended up setting the main thread futex address only on the main thread. We did not get that address to workers. Instead, they got a JS undefined. So they would not wake up the main thread. It is actually a lot more complicated than that, because futex_wait can recurse: while it does the busy-wait loop, it will run the event loop, so it does not starve pthreads and get deadlocks - that is, it checks if there are incoming proxied calls, and it runs them. To run the event loop we must take a mutex. With the right timing we can hit the slow paths in both the original mutex (leading to a call to futex_wait instead of a quick atomic operation) and in the proxying mutex, and then we get this recursion. This was not handled by the old code correctly, in two ways. First, it did not clear the main thread futex address when futex_wait exits due to a timeout. So it looked like we were still waiting. A later futex_wake on that futex would unset the main thread futex address, counting that as one of the things it wakes up - but nothing was woken up there, so it ended up not waking anyone up (if it was told to just wake 1 person). Second, futex_wake did not check if it was on the main thread. The main thread should never wake itself up, but it did, making it more confusing (I'm not sure if this caused deadlocks by itself, though). A possible third issue is that when recursing in that way we could have the main thread in effect waiting on two futexes, A and then B. If a worker wants to wake up A while in the recursed call that set B, we don't see A in the main thread futex address - there is just one such address, and B has trampled it. The old code would actually just stop waiting in this case, since it looped while the address is the address we set, which means that after it called the nested wait, which wrote B, when we return to the first loop we'd see 0 (if the wait for B ended cleanly) or B (if we timed out and due the bug mentioned earlier we left B as the main thread futex address erroneously). And both are not A so we'd stop. That wasn't correct, though - it's possible we should still be waiting for A. To fix all this, this PR does two things: Allocate the main thread futex address in C, so that it is accessible in workers properly. Change how futex_wait works: it sets the main thread futex address and unsets it before exiting. It also unsets it before doing a nested call, so that they cannot interfere with each other. This also adds a lot of assertions in that code, which would have caught the previous bug, and I also verified happen on our test suite, so we have decent coverage. The one tricky thing here is handling the case in the paragraph from earlier with futexes A and B that we wait for in a nested manner. When nested, we wait until B is done, then return to the loop for A. How can we tell if A is done or not? The pthread waking it up may have already had its chance to wake it. The solution taken here is to read the futex's memory value, exactly like when starting in futex_wait. That is, futex_wait checks if an address has a value, and only if it does then it waits. I believe it is valid to check it later as well, since the result is the same as if the main thread were busy with other stuff, then checked the value later, after the other thread did some work. See the comment in the source for more details. This fixes a test that was wrong in how it handled a race condition, and adds a disabled test that was very useful for manual debugging here. That test hammers on all the proxying and locking mechanisms at once, basically. It's not a small little test, but I couldn't find a way to simplify it, and it's just for manual debugging.
1 parent 2414b0d commit 0014175

File tree

7 files changed

+235
-33
lines changed

7 files changed

+235
-33
lines changed

src/library_pthread.js

+119-20
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ var LibraryPThread = {
99
$PThread__deps: ['$registerPthreadPtr',
1010
'$ERRNO_CODES', 'emscripten_futex_wake', '$killThread',
1111
'$cancelThread', '$cleanupThread',
12-
'_main_thread_futex_wait_address'
1312
#if USE_ASAN || USE_LSAN
1413
, '$withBuiltinMalloc'
1514
#endif
@@ -28,6 +27,9 @@ var LibraryPThread = {
2827
runningWorkers: [],
2928
// Points to a pthread_t structure in the Emscripten main heap, allocated on demand if/when first needed.
3029
// mainThreadBlock: undefined,
30+
// Stores the memory address that the main thread is waiting on, if any. If
31+
// the main thread is waiting, we wake it up before waking up any workers.
32+
// mainThreadFutex: undefined,
3133
initMainThreadBlock: function() {
3234
#if ASSERTIONS
3335
assert(!ENVIRONMENT_IS_PTHREAD);
@@ -69,7 +71,7 @@ var LibraryPThread = {
6971
Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.tid }}} ) >> 2, PThread.mainThreadBlock); // Main thread ID.
7072
Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.pid }}} ) >> 2, {{{ PROCINFO.pid }}}); // Process ID.
7173

72-
__main_thread_futex_wait_address = _malloc(4);
74+
PThread.initShared();
7375

7476
#if PTHREADS_PROFILING
7577
PThread.createProfilerBlock(PThread.mainThreadBlock);
@@ -87,6 +89,7 @@ var LibraryPThread = {
8789
_emscripten_register_main_browser_thread_id(PThread.mainThreadBlock);
8890
},
8991
initWorker: function() {
92+
PThread.initShared();
9093
#if EMBIND
9194
// Embind must initialize itself on all threads, as it generates support JS.
9295
Module['___embind_register_native_and_builtin_types']();
@@ -106,6 +109,12 @@ var LibraryPThread = {
106109
PThread['setThreadStatus'] = PThread.setThreadStatus;
107110
PThread['threadCancel'] = PThread.threadCancel;
108111
PThread['threadExit'] = PThread.threadExit;
112+
#endif
113+
},
114+
initShared: function() {
115+
PThread.mainThreadFutex = Module['_main_thread_futex'];
116+
#if ASSERTIONS
117+
assert(PThread.mainThreadFutex > 0);
109118
#endif
110119
},
111120
// Maps pthread_t to pthread info objects
@@ -1115,14 +1124,12 @@ var LibraryPThread = {
11151124
return 0;
11161125
},
11171126

1118-
// Stores the memory address that the main thread is waiting on, if any.
1119-
_main_thread_futex_wait_address: '0',
1120-
11211127
// Returns 0 on success, or one of the values -ETIMEDOUT, -EWOULDBLOCK or -EINVAL on error.
1122-
emscripten_futex_wait__deps: ['_main_thread_futex_wait_address', 'emscripten_main_thread_process_queued_calls'],
1128+
emscripten_futex_wait__deps: ['emscripten_main_thread_process_queued_calls'],
11231129
emscripten_futex_wait: function(addr, val, timeout) {
11241130
if (addr <= 0 || addr > HEAP8.length || addr&3 != 0) return -{{{ cDefine('EINVAL') }}};
1125-
if (ENVIRONMENT_IS_NODE || ENVIRONMENT_IS_WORKER) {
1131+
// We can do a normal blocking wait anywhere but on the main browser thread.
1132+
if (!ENVIRONMENT_IS_WEB) {
11261133
#if PTHREADS_PROFILING
11271134
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
11281135
#endif
@@ -1135,31 +1142,114 @@ var LibraryPThread = {
11351142
if (ret === 'ok') return 0;
11361143
throw 'Atomics.wait returned an unexpected value ' + ret;
11371144
} else {
1138-
// Atomics.wait is not available in the main browser thread, so simulate it via busy spinning.
1139-
var loadedVal = Atomics.load(HEAP32, addr >> 2);
1140-
if (val != loadedVal) return -{{{ cDefine('EWOULDBLOCK') }}};
1145+
// First, check if the value is correct for us to wait on.
1146+
if (Atomics.load(HEAP32, addr >> 2) != val) {
1147+
return -{{{ cDefine('EWOULDBLOCK') }}};
1148+
}
11411149

1150+
// Atomics.wait is not available in the main browser thread, so simulate it via busy spinning.
11421151
var tNow = performance.now();
11431152
var tEnd = tNow + timeout;
11441153

11451154
#if PTHREADS_PROFILING
11461155
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
11471156
#endif
1157+
// Register globally which address the main thread is simulating to be
1158+
// waiting on. When zero, the main thread is not waiting on anything, and on
1159+
// nonzero, the contents of the address pointed by PThread.mainThreadFutex
1160+
// tell which address the main thread is simulating its wait on.
1161+
// We need to be careful of recursion here: If we wait on a futex, and
1162+
// then call _emscripten_main_thread_process_queued_calls() below, that
1163+
// will call code that takes the proxying mutex - which can once more
1164+
// reach this code in a nested call. To avoid interference between the
1165+
// two (there is just a single mainThreadFutex at a time), unmark
1166+
// ourselves before calling the potentially-recursive call. See below for
1167+
// how we handle the case of our futex being notified during the time in
1168+
// between when we are not set as the value of mainThreadFutex.
1169+
#if ASSERTIONS
1170+
assert(PThread.mainThreadFutex > 0);
1171+
#endif
1172+
var lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr);
1173+
#if ASSERTIONS
1174+
// We must not have already been waiting.
1175+
assert(lastAddr == 0);
1176+
#endif
11481177

1149-
// Register globally which address the main thread is simulating to be waiting on. When zero, main thread is not waiting on anything,
1150-
// and on nonzero, the contents of address pointed by __main_thread_futex_wait_address tell which address the main thread is simulating its wait on.
1151-
Atomics.store(HEAP32, __main_thread_futex_wait_address >> 2, addr);
1152-
var ourWaitAddress = addr; // We may recursively re-enter this function while processing queued calls, in which case we'll do a spurious wakeup of the older wait operation.
1153-
while (addr == ourWaitAddress) {
1178+
while (1) {
1179+
// Check for a timeout.
11541180
tNow = performance.now();
11551181
if (tNow > tEnd) {
11561182
#if PTHREADS_PROFILING
11571183
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
1184+
#endif
1185+
// We timed out, so stop marking ourselves as waiting.
1186+
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0);
1187+
#if ASSERTIONS
1188+
// The current value must have been our address which we set, or
1189+
// in a race it was set to 0 which means another thread just allowed
1190+
// us to run, but (tragically) that happened just a bit too late.
1191+
assert(lastAddr == addr || lastAddr == 0);
11581192
#endif
11591193
return -{{{ cDefine('ETIMEDOUT') }}};
11601194
}
1161-
_emscripten_main_thread_process_queued_calls(); // We are performing a blocking loop here, so must pump any pthreads if they want to perform operations that are proxied.
1162-
addr = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2); // Look for a worker thread waking us up.
1195+
// We are performing a blocking loop here, so we must handle proxied
1196+
// events from pthreads, to avoid deadlocks.
1197+
// Note that we have to do so carefully, as we may take a lock while
1198+
// doing so, which can recurse into this function; stop marking
1199+
// ourselves as waiting while we do so.
1200+
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0);
1201+
#if ASSERTIONS
1202+
assert(lastAddr == addr || lastAddr == 0);
1203+
#endif
1204+
if (lastAddr == 0) {
1205+
// We were told to stop waiting, so stop.
1206+
break;
1207+
}
1208+
_emscripten_main_thread_process_queued_calls();
1209+
1210+
// Check the value, as if we were starting the futex all over again.
1211+
// This handles the following case:
1212+
//
1213+
// * wait on futex A
1214+
// * recurse into emscripten_main_thread_process_queued_calls(),
1215+
// which waits on futex B. that sets the mainThreadFutex address to
1216+
// futex B, and there is no longer any mention of futex A.
1217+
// * a worker is done with futex A. it checks mainThreadFutex but does
1218+
// not see A, so it does nothing special for the main thread.
1219+
// * a worker is done with futex B. it flips mainThreadMutex from B
1220+
// to 0, ending the wait on futex B.
1221+
// * we return to the wait on futex A. mainThreadFutex is 0, but that
1222+
// is because of futex B being done - we can't tell from
1223+
// mainThreadFutex whether A is done or not. therefore, check the
1224+
// memory value of the futex.
1225+
//
1226+
// That case motivates the design here. Given that, checking the memory
1227+
// address is also necessary for other reasons: we unset and re-set our
1228+
// address in mainThreadFutex around calls to
1229+
// emscripten_main_thread_process_queued_calls(), and a worker could
1230+
// attempt to wake us up right before/after such times.
1231+
//
1232+
// Note that checking the memory value of the futex is valid to do: we
1233+
// could easily have been delayed (relative to the worker holding on
1234+
// to futex A), which means we could be starting all of our work at the
1235+
// later time when there is no need to block. The only "odd" thing is
1236+
// that we may have caused side effects in that "delay" time. But the
1237+
// only side effects we can have are to call
1238+
// emscripten_main_thread_process_queued_calls(). That is always ok to
1239+
// do on the main thread (it's why it is ok for us to call it in the
1240+
// middle of this function, and elsewhere). So if we check the value
1241+
// here and return, it's the same is if what happened on the main thread
1242+
// was the same as calling emscripten_main_thread_process_queued_calls()
1243+
// a few times times before calling emscripten_futex_wait().
1244+
if (Atomics.load(HEAP32, addr >> 2) != val) {
1245+
return -{{{ cDefine('EWOULDBLOCK') }}};
1246+
}
1247+
1248+
// Mark us as waiting once more, and continue the loop.
1249+
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr);
1250+
#if ASSERTIONS
1251+
assert(lastAddr == 0);
1252+
#endif
11631253
}
11641254
#if PTHREADS_PROFILING
11651255
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
@@ -1170,7 +1260,6 @@ var LibraryPThread = {
11701260

11711261
// Returns the number of threads (>= 0) woken up, or the value -EINVAL on error.
11721262
// Pass count == INT_MAX to wake up all threads.
1173-
emscripten_futex_wake__deps: ['_main_thread_futex_wait_address'],
11741263
emscripten_futex_wake: function(addr, count) {
11751264
if (addr <= 0 || addr > HEAP8.length || addr&3 != 0 || count < 0) return -{{{ cDefine('EINVAL') }}};
11761265
if (count == 0) return 0;
@@ -1181,10 +1270,20 @@ var LibraryPThread = {
11811270
// See if main thread is waiting on this address? If so, wake it up by resetting its wake location to zero.
11821271
// Note that this is not a fair procedure, since we always wake main thread first before any workers, so
11831272
// this scheme does not adhere to real queue-based waiting.
1184-
var mainThreadWaitAddress = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2);
1273+
#if ASSERTIONS
1274+
assert(PThread.mainThreadFutex > 0);
1275+
#endif
1276+
var mainThreadWaitAddress = Atomics.load(HEAP32, PThread.mainThreadFutex >> 2);
11851277
var mainThreadWoken = 0;
11861278
if (mainThreadWaitAddress == addr) {
1187-
var loadedAddr = Atomics.compareExchange(HEAP32, __main_thread_futex_wait_address >> 2, mainThreadWaitAddress, 0);
1279+
#if ASSERTIONS
1280+
// We only use mainThreadFutex on the main browser thread, where we
1281+
// cannot block while we wait. Therefore we should only see it set from
1282+
// other threads, and not on the main thread itself. In other words, the
1283+
// main thread must never try to wake itself up!
1284+
assert(!ENVIRONMENT_IS_WEB);
1285+
#endif
1286+
var loadedAddr = Atomics.compareExchange(HEAP32, PThread.mainThreadFutex >> 2, mainThreadWaitAddress, 0);
11881287
if (loadedAddr == mainThreadWaitAddress) {
11891288
--count;
11901289
mainThreadWoken = 1;

src/worker.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ this.onmessage = function(e) {
244244
}
245245
} catch(ex) {
246246
err('worker.js onmessage() captured an uncaught exception: ' + ex);
247-
if (ex.stack) err(ex.stack);
247+
if (ex && ex.stack) err(ex.stack);
248248
throw ex;
249249
}
250250
};

system/lib/pthread/library_pthread.c

+5
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,11 @@ int llvm_atomic_load_add_i32_p0i32(int* ptr, int delta) {
913913
return emscripten_atomic_add_u32(ptr, delta);
914914
}
915915

916+
// Stores the memory address that the main thread is waiting on, if any. If
917+
// the main thread is waiting, we wake it up before waking up any workers.
918+
EMSCRIPTEN_KEEPALIVE
919+
void* main_thread_futex;
920+
916921
typedef struct main_args {
917922
int argc;
918923
char** argv;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#include <errno.h>
2+
#include <stdlib.h>
3+
#include <fcntl.h>
4+
#include <unistd.h>
5+
#include <emscripten.h>
6+
#include <stdio.h>
7+
8+
class random_device
9+
{
10+
int __f_;
11+
public:
12+
// constructors
13+
explicit random_device();
14+
~random_device();
15+
16+
// generating functions
17+
unsigned operator()();
18+
};
19+
20+
random_device::random_device()
21+
{
22+
__f_ = open("/dev/urandom", O_RDONLY);
23+
if (__f_ < 0) abort();
24+
}
25+
26+
random_device::~random_device()
27+
{
28+
close(__f_);
29+
}
30+
31+
unsigned
32+
random_device::operator()()
33+
{
34+
unsigned r;
35+
size_t n = sizeof(r);
36+
char* p = reinterpret_cast<char*>(&r);
37+
while (n > 0)
38+
{
39+
ssize_t s = read(__f_, p, 1);
40+
if (s == 0) abort();
41+
if (s == -1)
42+
{
43+
if (errno != EINTR) abort();
44+
continue;
45+
}
46+
n -= static_cast<size_t>(s);
47+
p += static_cast<size_t>(s);
48+
}
49+
return r;
50+
}
51+
52+
int main() {
53+
int total = 0;
54+
for (int i = 0; i < ITERATIONS; i++) {
55+
// printf causes proxying
56+
printf("%d %d\n", i, total);
57+
for (int j = 0; j < 1024; j++) {
58+
// allocation uses a mutex
59+
auto* rd = new random_device();
60+
// reading data causes proxying
61+
for (int k = 0; k < 4; k++) {
62+
total += (*rd)();
63+
}
64+
// make sure the optimizer doesn't remove the allocation
65+
EM_ASM({ out("iter") }, rd);
66+
delete rd;
67+
}
68+
}
69+
printf("done: %d", total);
70+
#ifdef REPORT_RESULT
71+
REPORT_RESULT(0);
72+
#endif
73+
return 0;
74+
}
75+

tests/pthread/test_pthread_proxying_in_futex_wait.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// University of Illinois/NCSA Open Source License. Both these licenses can be
44
// found in the LICENSE file.
55

6+
#include <errno.h>
67
#include <stdio.h>
78
#include <stdlib.h>
89
#include <pthread.h>
@@ -47,9 +48,12 @@ int main()
4748
int rc = pthread_create(&thread, &attr, ThreadMain, 0);
4849
assert(rc == 0);
4950
rc = emscripten_futex_wait(&main_thread_wait_val, 1, 15 * 1000);
50-
if (rc != 0)
51+
// An rc of 0 means no error, and of EWOULDBLOCK means that the value is
52+
// not the expected one, which can happen if the pthread manages to set it
53+
// before we reach the futex_wait.
54+
if (rc != 0 && rc != -EWOULDBLOCK)
5155
{
52-
printf("ERROR! futex wait timed out!\n");
56+
printf("ERROR! futex wait errored %d!\n", rc);
5357
result = 2;
5458
#ifdef REPORT_RESULT
5559
REPORT_RESULT(result);

tests/runner.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -1356,12 +1356,12 @@ def assert_out_queue_empty(self, who):
13561356
self.harness_out_queue.get()
13571357
raise Exception('excessive responses from %s' % who)
13581358

1359-
# @param tries_left: how many more times to try this test, if it fails. browser tests have
1360-
# many more causes of flakiness (in particular, they do not run
1361-
# synchronously, so we have a timeout, which can be hit if the VM
1362-
# we run on stalls temporarily), so we let each test try more than
1363-
# once by default
1364-
def run_browser(self, html_file, message, expectedResult=None, timeout=None, tries_left=1):
1359+
# @param extra_tries: how many more times to try this test, if it fails. browser tests have
1360+
# many more causes of flakiness (in particular, they do not run
1361+
# synchronously, so we have a timeout, which can be hit if the VM
1362+
# we run on stalls temporarily), so we let each test try more than
1363+
# once by default
1364+
def run_browser(self, html_file, message, expectedResult=None, timeout=None, extra_tries=1):
13651365
if not has_browser():
13661366
return
13671367
if BrowserCore.unresponsive_tests >= BrowserCore.MAX_UNRESPONSIVE_TESTS:
@@ -1400,10 +1400,10 @@ def run_browser(self, html_file, message, expectedResult=None, timeout=None, tri
14001400
try:
14011401
self.assertIdenticalUrlEncoded(expectedResult, output)
14021402
except Exception as e:
1403-
if tries_left > 0:
1403+
if extra_tries > 0:
14041404
print('[test error (see below), automatically retrying]')
14051405
print(e)
1406-
return self.run_browser(html_file, message, expectedResult, timeout, tries_left - 1)
1406+
return self.run_browser(html_file, message, expectedResult, timeout, extra_tries - 1)
14071407
else:
14081408
raise e
14091409
finally:
@@ -1541,7 +1541,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False,
15411541
reference_slack=0, manual_reference=False, post_build=None,
15421542
args=[], outfile='test.html', message='.', also_proxied=False,
15431543
url_suffix='', timeout=None, also_asmjs=False,
1544-
manually_trigger_reftest=False):
1544+
manually_trigger_reftest=False, extra_tries=1):
15451545
assert expected or reference, 'a btest must either expect an output, or have a reference image'
15461546
# if we are provided the source and not a path, use that
15471547
filename_is_src = '\n' in filename
@@ -1575,7 +1575,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False,
15751575
post_build()
15761576
if not isinstance(expected, list):
15771577
expected = [expected]
1578-
self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout)
1578+
self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout, extra_tries=extra_tries)
15791579

15801580
# Tests can opt into being run under asmjs as well
15811581
if 'WASM=0' not in args and (also_asmjs or self.also_asmjs):

0 commit comments

Comments
 (0)