Skip to content

Commit d58e058

Browse files
committed
Allow pthreads on Node.js without a pthread pool
Node.js `worker_threads` are different than browser `Worker`s in that they start spawning synchronously without waiting for an event loop tick. If we can also avoid waiting for the "loaded" event from the worker before sending pthread messages to it, then we can spawn new pthreads "synchronously" (prepare everything within the same event loop tick), and block the current thread, making the behaviour on Node.js a lot closer to native and avoiding the need for a pthread pool even without Asyncify or extra helper workers. That's what I did in this implementation. Instead of waiting for the worker to tell us it's loaded and ready, I'm sending all commands immediately to the worker. The worker accepts the first "load" message, starts initializing the runtime, and meanwhile queues up any further messages such as "run". Once the runtime is ready, it processes the queue. I could limit those changes only to Node.js, but it's easier to do both together, allows to avoid a custom `worker.runPthread` callback, and should be in theory even a bit faster in browsers too (by avoiding the "loaded" roundtrip).
1 parent c93b3a1 commit d58e058

File tree

4 files changed

+71
-45
lines changed

4 files changed

+71
-45
lines changed

emcc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,6 +2274,8 @@ def phase_linker_setup(options, state, newargs):
22742274
if settings.USE_PTHREADS:
22752275
setup_pthreads(target)
22762276
settings.JS_LIBRARIES.append((0, 'library_pthread.js'))
2277+
if settings.PROXY_TO_PTHREAD:
2278+
settings.PTHREAD_POOL_SIZE_STRICT = 0
22772279
else:
22782280
if settings.PROXY_TO_PTHREAD:
22792281
exit_with_error('-sPROXY_TO_PTHREAD requires -sUSE_PTHREADS to work!')

src/library_pthread.js

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -277,18 +277,15 @@ var LibraryPThread = {
277277
} else if (cmd === 'loaded') {
278278
worker.loaded = true;
279279
#if ENVIRONMENT_MAY_BE_NODE
280-
if (ENVIRONMENT_IS_NODE) {
280+
// Check that this worker doesn't have an associated pthread.
281+
if (ENVIRONMENT_IS_NODE && !worker.pthread_ptr) {
281282
// Once worker is loaded & idle, mark it as weakly referenced,
282283
// so that mere existence of a Worker in the pool does not prevent
283284
// Node.js from exiting the app.
284285
worker.unref();
285286
}
286287
#endif
287288
if (onFinishedLoading) onFinishedLoading(worker);
288-
// If this Worker is already pending to start running a thread, launch the thread now
289-
if (worker.runPthread) {
290-
worker.runPthread();
291-
}
292289
} else if (cmd === 'print') {
293290
out('Thread ' + d['threadId'] + ': ' + d['text']);
294291
} else if (cmd === 'printErr') {
@@ -455,22 +452,30 @@ var LibraryPThread = {
455452

456453
getNewWorker: function() {
457454
if (PThread.unusedWorkers.length == 0) {
458-
#if !PROXY_TO_PTHREAD && PTHREAD_POOL_SIZE_STRICT
455+
// PTHREAD_POOL_SIZE_STRICT should show a warning and, if set to level `2`, return from the function.
456+
#if PTHREAD_POOL_SIZE_STRICT && ASSERTIONS || PTHREAD_POOL_SIZE_STRICT == 2
457+
// However, if we're in Node.js, then we can create new workers on the fly and PTHREAD_POOL_SIZE_STRICT
458+
// should be ignored altogether.
459+
#if ENVIRONMENT_MAY_BE_NODE
460+
if (!ENVIRONMENT_IS_NODE) {
461+
#endif
459462
#if ASSERTIONS
460-
err('Tried to spawn a new thread, but the thread pool is exhausted.\n' +
461-
'This might result in a deadlock unless some threads eventually exit or the code explicitly breaks out to the event loop.\n' +
462-
'If you want to increase the pool size, use setting `-sPTHREAD_POOL_SIZE=...`.'
463+
err('Tried to spawn a new thread, but the thread pool is exhausted.\n' +
464+
'This might result in a deadlock unless some threads eventually exit or the code explicitly breaks out to the event loop.\n' +
465+
'If you want to increase the pool size, use setting `-sPTHREAD_POOL_SIZE=...`.'
463466
#if PTHREAD_POOL_SIZE_STRICT == 1
464-
+ '\nIf you want to throw an explicit error instead of the risk of deadlocking in those cases, use setting `-sPTHREAD_POOL_SIZE_STRICT=2`.'
467+
+ '\nIf you want to throw an explicit error instead of the risk of deadlocking in those cases, use setting `-sPTHREAD_POOL_SIZE_STRICT=2`.'
465468
#endif
466-
);
469+
);
467470
#endif // ASSERTIONS
468-
471+
#if PTHREAD_POOL_SIZE_STRICT == 2
472+
return;
469473
#endif
470-
#if !PROXY_TO_PTHREAD && PTHREAD_POOL_SIZE_STRICT == 2
471-
// Don't return a Worker, which will translate into an EAGAIN error.
472-
return;
473-
#else
474+
#if ENVIRONMENT_MAY_BE_NODE
475+
}
476+
#endif
477+
#endif // PTHREAD_POOL_SIZE_STRICT
478+
#if PTHREAD_POOL_SIZE_STRICT < 2 || ENVIRONMENT_MAY_BE_NODE
474479
PThread.allocateUnusedWorker();
475480
PThread.loadWasmModuleToWorker(PThread.unusedWorkers[0]);
476481
#endif
@@ -602,21 +607,15 @@ var LibraryPThread = {
602607
msg.moduleCanvasId = threadParams.moduleCanvasId;
603608
msg.offscreenCanvases = threadParams.offscreenCanvases;
604609
#endif
605-
worker.runPthread = () => {
606-
// Ask the worker to start executing its pthread entry point function.
610+
// Ask the worker to start executing its pthread entry point function.
607611
#if ENVIRONMENT_MAY_BE_NODE
608-
if (ENVIRONMENT_IS_NODE) {
609-
// Mark worker as strongly referenced once we start executing a pthread,
610-
// so that Node.js doesn't exit while the pthread is running.
611-
worker.ref();
612-
}
613-
#endif
614-
worker.postMessage(msg, threadParams.transferList);
615-
delete worker.runPthread;
616-
};
617-
if (worker.loaded) {
618-
worker.runPthread();
612+
if (ENVIRONMENT_IS_NODE) {
613+
// Mark worker as strongly referenced once we start executing a pthread,
614+
// so that Node.js doesn't exit while the pthread is running.
615+
worker.ref();
619616
}
617+
#endif
618+
worker.postMessage(msg, threadParams.transferList);
620619
return 0;
621620
},
622621

src/worker.js

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,7 @@ self.onunhandledrejection = (e) => {
114114
throw e.reason ?? e;
115115
};
116116

117-
// Add a callback for when the runtime is initialized.
118-
self.startWorker = (instance) => {
119-
#if MODULARIZE
120-
Module = instance;
121-
#endif
122-
// Notify the main thread that this thread has loaded.
123-
postMessage({ 'cmd': 'loaded' });
124-
};
125-
126-
self.onmessage = (e) => {
117+
function handleMessage(e) {
127118
try {
128119
if (e.data.cmd === 'load') { // Preload command that is called once per worker to parse and load the Emscripten code.
129120
#if PTHREADS_DEBUG
@@ -133,6 +124,25 @@ self.onmessage = (e) => {
133124
var imports = {};
134125
#endif
135126

127+
// Until we initialize the runtime, queue up any further incoming messages.
128+
let messageQueue = [];
129+
self.onmessage = (e) => messageQueue.push(e);
130+
131+
// And add a callback for when the runtime is initialized.
132+
self.startWorker = (instance) => {
133+
#if MODULARIZE
134+
Module = instance;
135+
#endif
136+
// Notify the main thread that this thread has loaded.
137+
postMessage({ 'cmd': 'loaded' });
138+
// Process any messages that were queued before the thread was ready.
139+
for (let msg of messageQueue) {
140+
handleMessage(msg);
141+
}
142+
// Restore the real message handler.
143+
self.onmessage = handleMessage;
144+
};
145+
136146
// Module and memory were sent from main thread
137147
#if MINIMAL_RUNTIME
138148
#if MODULARIZE
@@ -303,3 +313,5 @@ self.onmessage = (e) => {
303313
throw ex;
304314
}
305315
};
316+
317+
self.onmessage = handleMessage;

test/test_core.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9116,8 +9116,13 @@ def test_pthread_create(self):
91169116
self.do_run_in_out_file_test('core/pthread/create.cpp')
91179117

91189118
@node_pthreads
9119-
def test_pthread_c11_threads(self):
9120-
self.set_setting('PROXY_TO_PTHREAD')
9119+
@parameterized({
9120+
'unpooled': ([],),
9121+
'pooled': (['-sPTHREAD_POOL_SIZE=1'],),
9122+
'proxied': (['-sPROXY_TO_PTHREAD'],),
9123+
})
9124+
def test_pthread_c11_threads(self, args):
9125+
self.emcc_args += args
91219126
self.set_setting('EXIT_RUNTIME')
91229127
self.set_setting('PTHREADS_DEBUG')
91239128
if not self.has_changed_setting('INITIAL_MEMORY'):
@@ -9127,13 +9132,21 @@ def test_pthread_c11_threads(self):
91279132
self.do_run_in_out_file_test('pthread/test_pthread_c11_threads.c')
91289133

91299134
@node_pthreads
9130-
def test_pthread_cxx_threads(self):
9131-
self.set_setting('PTHREAD_POOL_SIZE', 1)
9135+
@parameterized({
9136+
'unpooled': (0,),
9137+
'pooled': (1,),
9138+
})
9139+
def test_pthread_cxx_threads(self, pthread_pool_size):
9140+
self.set_setting('PTHREAD_POOL_SIZE', pthread_pool_size)
91329141
self.do_run_in_out_file_test('pthread/test_pthread_cxx_threads.cpp')
91339142

91349143
@node_pthreads
9135-
def test_pthread_busy_wait(self):
9136-
self.set_setting('PTHREAD_POOL_SIZE', 1)
9144+
@parameterized({
9145+
'unpooled': (0,),
9146+
'pooled': (1,),
9147+
})
9148+
def test_pthread_busy_wait(self, pthread_pool_size):
9149+
self.set_setting('PTHREAD_POOL_SIZE', pthread_pool_size)
91379150
self.do_run_in_out_file_test('pthread/test_pthread_busy_wait.cpp')
91389151

91399152
@node_pthreads

0 commit comments

Comments
 (0)