Skip to content

Allow pthreads on Node.js without a pthread pool #18305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ See docs/process.md for more on how version tagging works.
-----------------------
- Added `Module.pthreadPoolReady` promise for the `PTHREAD_POOL_DELAY_LOAD`
mode that allows to safely join spawned threads. (#18281)
- PThreads can now be safely spawned on-demand in Node.js even without a PThread
pool (`PTHREAD_POOL_SIZE`) or proxying (`PROXY_TO_PTHREAD`) options. (#18305)

3.1.28 - 12/08/22
-----------------
Expand Down
2 changes: 2 additions & 0 deletions emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,8 @@ def phase_linker_setup(options, state, newargs):
if settings.USE_PTHREADS:
setup_pthreads(target)
settings.JS_LIBRARIES.append((0, 'library_pthread.js'))
if settings.PROXY_TO_PTHREAD:
settings.PTHREAD_POOL_SIZE_STRICT = 0
else:
if settings.PROXY_TO_PTHREAD:
exit_with_error('-sPROXY_TO_PTHREAD requires -sUSE_PTHREADS to work!')
Expand Down
57 changes: 28 additions & 29 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,15 @@ var LibraryPThread = {
} else if (cmd === 'loaded') {
worker.loaded = true;
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
// Check that this worker doesn't have an associated pthread.
if (ENVIRONMENT_IS_NODE && !worker.pthread_ptr) {
// Once worker is loaded & idle, mark it as weakly referenced,
// so that mere existence of a Worker in the pool does not prevent
// Node.js from exiting the app.
worker.unref();
}
#endif
onFinishedLoading(worker);
// If this Worker is already pending to start running a thread, launch the thread now
if (worker.runPthread) {
worker.runPthread();
}
} else if (cmd === 'print') {
out('Thread ' + d['threadId'] + ': ' + d['text']);
} else if (cmd === 'printErr') {
Expand Down Expand Up @@ -482,22 +479,30 @@ var LibraryPThread = {

getNewWorker: function() {
if (PThread.unusedWorkers.length == 0) {
#if !PROXY_TO_PTHREAD && PTHREAD_POOL_SIZE_STRICT
// PTHREAD_POOL_SIZE_STRICT should show a warning and, if set to level `2`, return from the function.
#if (PTHREAD_POOL_SIZE_STRICT && ASSERTIONS) || PTHREAD_POOL_SIZE_STRICT == 2
// However, if we're in Node.js, then we can create new workers on the fly and PTHREAD_POOL_SIZE_STRICT
// should be ignored altogether.
#if ENVIRONMENT_MAY_BE_NODE
if (!ENVIRONMENT_IS_NODE) {
#endif
#if ASSERTIONS
err('Tried to spawn a new thread, but the thread pool is exhausted.\n' +
'This might result in a deadlock unless some threads eventually exit or the code explicitly breaks out to the event loop.\n' +
'If you want to increase the pool size, use setting `-sPTHREAD_POOL_SIZE=...`.'
err('Tried to spawn a new thread, but the thread pool is exhausted.\n' +
'This might result in a deadlock unless some threads eventually exit or the code explicitly breaks out to the event loop.\n' +
'If you want to increase the pool size, use setting `-sPTHREAD_POOL_SIZE=...`.'
#if PTHREAD_POOL_SIZE_STRICT == 1
+ '\nIf you want to throw an explicit error instead of the risk of deadlocking in those cases, use setting `-sPTHREAD_POOL_SIZE_STRICT=2`.'
+ '\nIf you want to throw an explicit error instead of the risk of deadlocking in those cases, use setting `-sPTHREAD_POOL_SIZE_STRICT=2`.'
#endif
);
);
#endif // ASSERTIONS

#if PTHREAD_POOL_SIZE_STRICT == 2
return;
#endif
#if !PROXY_TO_PTHREAD && PTHREAD_POOL_SIZE_STRICT == 2
// Don't return a Worker, which will translate into an EAGAIN error.
return;
#else
#if ENVIRONMENT_MAY_BE_NODE
}
#endif
#endif // PTHREAD_POOL_SIZE_STRICT
#if PTHREAD_POOL_SIZE_STRICT < 2 || ENVIRONMENT_MAY_BE_NODE
PThread.allocateUnusedWorker();
PThread.loadWasmModuleToWorker(PThread.unusedWorkers[0]);
#endif
Expand Down Expand Up @@ -629,21 +634,15 @@ var LibraryPThread = {
msg.moduleCanvasId = threadParams.moduleCanvasId;
msg.offscreenCanvases = threadParams.offscreenCanvases;
#endif
worker.runPthread = () => {
// Ask the worker to start executing its pthread entry point function.
// Ask the worker to start executing its pthread entry point function.
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
// Mark worker as strongly referenced once we start executing a pthread,
// so that Node.js doesn't exit while the pthread is running.
worker.ref();
}
#endif
worker.postMessage(msg, threadParams.transferList);
delete worker.runPthread;
};
if (worker.loaded) {
worker.runPthread();
if (ENVIRONMENT_IS_NODE) {
// Mark worker as strongly referenced once we start executing a pthread,
// so that Node.js doesn't exit while the pthread is running.
worker.ref();
}
#endif
worker.postMessage(msg, threadParams.transferList);
return 0;
},

Expand Down
32 changes: 22 additions & 10 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,7 @@ self.onunhandledrejection = (e) => {
throw e.reason ?? e;
};

// Add a callback for when the runtime is initialized.
self.startWorker = (instance) => {
#if MODULARIZE
Module = instance;
#endif
// Notify the main thread that this thread has loaded.
postMessage({ 'cmd': 'loaded' });
};

self.onmessage = (e) => {
function handleMessage(e) {
try {
if (e.data.cmd === 'load') { // Preload command that is called once per worker to parse and load the Emscripten code.
#if PTHREADS_DEBUG
Expand All @@ -133,6 +124,25 @@ self.onmessage = (e) => {
var imports = {};
#endif

// Until we initialize the runtime, queue up any further incoming messages.
let messageQueue = [];
self.onmessage = (e) => messageQueue.push(e);

// And add a callback for when the runtime is initialized.
self.startWorker = (instance) => {
#if MODULARIZE
Module = instance;
#endif
// Notify the main thread that this thread has loaded.
postMessage({ 'cmd': 'loaded' });
// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleMessage(msg);
}
// Restore the real message handler.
self.onmessage = handleMessage;
};

// Module and memory were sent from main thread
#if MINIMAL_RUNTIME
#if MODULARIZE
Expand Down Expand Up @@ -303,3 +313,5 @@ self.onmessage = (e) => {
throw ex;
}
};

self.onmessage = handleMessage;
26 changes: 19 additions & 7 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9129,9 +9129,13 @@ def test_pthread_create(self):
self.do_run_in_out_file_test('core/pthread/create.cpp')

@node_pthreads
def test_pthread_c11_threads(self):
self.set_setting('PROXY_TO_PTHREAD')
self.set_setting('EXIT_RUNTIME')
@parameterized({
'': ([],),
'pooled': (['-sPTHREAD_POOL_SIZE=1'],),
'proxied': (['-sPROXY_TO_PTHREAD', '-sEXIT_RUNTIME'],),
})
def test_pthread_c11_threads(self, args):
self.emcc_args += args
self.set_setting('PTHREADS_DEBUG')
if not self.has_changed_setting('INITIAL_MEMORY'):
self.set_setting('INITIAL_MEMORY', '64mb')
Expand All @@ -9140,13 +9144,21 @@ def test_pthread_c11_threads(self):
self.do_run_in_out_file_test('pthread/test_pthread_c11_threads.c')

@node_pthreads
def test_pthread_cxx_threads(self):
self.set_setting('PTHREAD_POOL_SIZE', 1)
@parameterized({
'': (0,),
'pooled': (1,),
})
def test_pthread_cxx_threads(self, pthread_pool_size):
self.set_setting('PTHREAD_POOL_SIZE', pthread_pool_size)
self.do_run_in_out_file_test('pthread/test_pthread_cxx_threads.cpp')

@node_pthreads
def test_pthread_busy_wait(self):
self.set_setting('PTHREAD_POOL_SIZE', 1)
@parameterized({
'': (0,),
'pooled': (1,),
})
def test_pthread_busy_wait(self, pthread_pool_size):
self.set_setting('PTHREAD_POOL_SIZE', pthread_pool_size)
self.do_run_in_out_file_test('pthread/test_pthread_busy_wait.cpp')

@node_pthreads
Expand Down