Skip to content

[Proxying] Use a dedicated worker to pass messages between threads #18563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions emcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,9 +1676,6 @@ def setup_pthreads(target):
# All proxying async backends will need this.
if settings.WASMFS:
settings.REQUIRED_EXPORTS += ['emscripten_proxy_finish']
# TODO: Remove this once we no longer need the heartbeat hack in
# wasmfs/thread_utils.h
settings.REQUIRED_EXPORTS += ['emscripten_proxy_execute_queue']

# pthread stack setup and other necessary utilities
def include_and_export(name):
Expand Down
151 changes: 137 additions & 14 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var LibraryPThread = {
PThread.initWorker();
} else {
PThread.initMainThread();
PThread.initMessageRelay();
}
},
initMainThread: function() {
Expand All @@ -99,13 +100,14 @@ var LibraryPThread = {
}
#endif
},

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete this empty line but include one before initMessageRelay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, will be more consistent here.

initWorker: function() {
#if USE_CLOSURE_COMPILER
// worker.js is not compiled together with us, and must access certain
// things.
PThread['receiveObjectTransfer'] = PThread.receiveObjectTransfer;
PThread['threadInitTLS'] = PThread.threadInitTLS;
PThread['receiveMessageRelayPort'] = PThread.receiveMessageRelayPort;
PThread['closeMessageRelayPort'] = PThread.closeMessageRelayPort;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably figure out a way to do this that isn't do hacky (one day).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just calling these messageRelayClose and messageReleyReceive? (Probably just my person preference for shorter names so feel free to ignore)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that shorter names would be nicer, but I think it's useful to clarify that we're receiving the port and not an actual message from the relay.

#if !MINIMAL_RUNTIME
PThread['setExitStatus'] = PThread.setExitStatus;
#endif
Expand All @@ -120,6 +122,90 @@ var LibraryPThread = {
// call emscripten_unwind_to_js_event_loop to extend their lifetime beyond
// their main function. See comment in src/worker.js for more.
noExitRuntime = false;
#endif
},
initMessageRelay: function() {
// Spawn a dedicated worker for passing messages between threads. Instead
// of having each thread hold a message port for every other thread, they
// can just send messages with a `targetThread` property to the relay and
// the relay will forward the message to the intended recipient.
// Alternatively, the main thread could play this role, but then messages
// could be held up while the main thread is busy with other work.
var relayCode = '';

// On Node, first do the minimal work to get a Web-compatible messaging
// interface. Use a template literal to avoid JS parsers failing on the
// unexpected `import` token.
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
relayCode +=
`(await import('node:worker_threads'))
.parentPort
.on('message', (data) => onmessage({ data: data }));
Object.assign(global, {
self: global,
});
`;
}
#endif
relayCode += '(' + (() => {
// Map pthread IDs to message ports we use to communicate with those
// pthreads.
const threadPorts = new Map();

// Map recipient pthread IDs for whom we don't yet have a message port
// to messages we've received for them.
const bufferedMessages = new Map();

function handleMessage(msg) {
const thread = msg.data.targetThread;
const port = threadPorts.get(thread);
if (port) {
port.postMessage(msg.data, msg.data.transferList);
} else {
// Hold on to the message until we receive a port for the recipient.
if (!bufferedMessages.has(thread)) {
bufferedMessages.set(thread, []);
}
bufferedMessages.get(thread).push(msg);
}
}

self.onmessage = (msg) => {
const cmd = msg.data.cmd;
const thread = msg.data.thread;
if (cmd === 'create') {
const port = msg.data.port;
threadPorts.set(thread, port);
port.onmessage = handleMessage;
// Forward any messages we have already received for this thread.
if (bufferedMessages.has(thread)) {
bufferedMessages.get(thread).forEach(handleMessage);
bufferedMessages.delete(thread);
}
} else if (cmd === 'destroy') {
bufferedMessages.delete(thread);
}
#if ASSERTIONS
else {
console.error('unrecognized message relay command:', cmd);
}
#endif
};
}).toString() + ')()';

var url = 'data:text/javascript,' + encodeURIComponent(relayCode);
#if ASSERTIONS
PThread.messageRelay = new Worker(url, {name: "message-relay"});
#else
PThread.messageRelay = new Worker(url);
#endif
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
// Do not keep Node alive if the message relay is the only thing
// running.
PThread.messageRelay.unref();
}
#endif
},

Expand Down Expand Up @@ -251,17 +337,9 @@ var LibraryPThread = {
// accessible variable about the thread that initiated the proxying.
if (worker.pthread_ptr) PThread.currentProxiedOperationCallerThread = worker.pthread_ptr;

// If this message is intended to a recipient that is not the main thread, forward it to the target thread.
if (d['targetThread'] && d['targetThread'] != _pthread_self()) {
var targetWorker = PThread.pthreads[d.targetThread];
if (targetWorker) {
targetWorker.postMessage(d, d['transferList']);
} else {
err('Internal error! Worker sent a message "' + cmd + '" to target pthread ' + d['targetThread'] + ', but that thread no longer exists!');
}
PThread.currentProxiedOperationCallerThread = undefined;
return;
}
#if ASSERTIONS
assert(!d['targetThread'] || d['targetThread'] == _pthread_self());
#endif

if (cmd === 'processProxyingQueue') {
executeNotifiedProxyingQueue(d['queue']);
Expand Down Expand Up @@ -507,6 +585,21 @@ var LibraryPThread = {
#endif
}
return PThread.unusedWorkers.pop();
},

receiveMessageRelayPort: function(port) {
#if ASSERTIONS
assert(ENVIRONMENT_IS_PTHREAD);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap asserts in #if ASSERTIONS

#endif
PThread.messageRelay = port;
},

closeMessageRelayPort: function() {
#if ASSERTIONS
assert(ENVIRONMENT_IS_PTHREAD);
#endif
PThread.messageRelay.close();
delete PThread.messageRelay;
}
},

Expand All @@ -522,6 +615,7 @@ var LibraryPThread = {
var worker = PThread.pthreads[pthread_ptr];
delete PThread.pthreads[pthread_ptr];
worker.terminate();
PThread.messageRelay.postMessage({'cmd': 'destroy', 'thread': pthread_ptr});
__emscripten_thread_free_data(pthread_ptr);
// The worker was completely nuked (not just the pthread execution it was hosting), so remove it from running workers
// but don't put it back to the pool.
Expand All @@ -544,6 +638,7 @@ var LibraryPThread = {
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! cleanupThread() can only ever be called from main application thread!');
assert(pthread_ptr, 'Internal Error! Null pthread_ptr in cleanupThread!');
#endif
PThread.messageRelay.postMessage({'cmd': 'destroy', 'thread': pthread_ptr});
var worker = PThread.pthreads[pthread_ptr];
assert(worker);
PThread.returnWorkerToPool(worker);
Expand Down Expand Up @@ -621,11 +716,20 @@ var LibraryPThread = {
PThread.pthreads[threadParams.pthread_ptr] = worker;

worker.pthread_ptr = threadParams.pthread_ptr;

#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
// TODO: This isn't necessary in Node 18+
global.MessageChannel = require('worker_threads').MessageChannel;
}
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid declaring a new name here and just polyfil on node using var MessageChannel = require ...?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or global.MessageChannel = require ...)

var relayChannel = new MessageChannel();
var msg = {
'cmd': 'run',
'start_routine': threadParams.startRoutine,
'arg': threadParams.arg,
'pthread_ptr': threadParams.pthread_ptr,
'port': relayChannel.port1,
};
#if OFFSCREENCANVAS_SUPPORT
// Note that we do not need to quote these names because they are only used
Expand All @@ -641,7 +745,16 @@ var LibraryPThread = {
worker.ref();
}
#endif
worker.postMessage(msg, threadParams.transferList);
// Send one end of the relay channel to the newly created thread and the
// other end to the messageRelay worker so that other threads can send
// messages to the new thread through the messageRelay.
worker.postMessage(msg,
threadParams.transferList.concat([relayChannel.port1]));
PThread.messageRelay.postMessage({
'cmd': 'create',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment here.. Such as "Send one end of the relay channel to the newly created thread, and the other end of the messageRelay worker"?

'thread': threadParams.pthread_ptr,
'port': relayChannel.port2,
}, [relayChannel.port2]);
return 0;
},

Expand Down Expand Up @@ -1062,6 +1175,7 @@ var LibraryPThread = {
checkStackCookie();
#endif
#if MINIMAL_RUNTIME
PThread.closeMessageRelayPort();
// In MINIMAL_RUNTIME the noExitRuntime concept does not apply to
// pthreads. To exit a pthread with live runtime, use the function
// emscripten_unwind_to_js_event_loop() in the pthread body.
Expand All @@ -1070,6 +1184,7 @@ var LibraryPThread = {
if (keepRuntimeAlive()) {
PThread.setExitStatus(result);
} else {
PThread.closeMessageRelayPort();
__emscripten_thread_exit(result);
}
#endif
Expand All @@ -1096,9 +1211,17 @@ var LibraryPThread = {
_emscripten_notify_task_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
if (targetThreadId == currThreadId) {
setTimeout(() => executeNotifiedProxyingQueue(queue));
} else if (targetThreadId == mainThreadId) {
// Messages to the main thread do not go through the `messageRelay` since
// every worker has the capability to message the main thread directly.
postMessage({'cmd' : 'processProxyingQueue', 'queue' : queue});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the main thread special here? (Is this just an optimization?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in principle the main thread could listen only on the message broker's channel for messages from all other threads, but since it already listens directly to every thread and the code isn't that complex, I figured it would be better to keep messages to the main thread as 1 hop instead of 2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth a comment?

} else if (ENVIRONMENT_IS_PTHREAD) {
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});
// We are a pthread messaging another pthread, so go through the
// `messageRelay`.
PThread.messageRelay.postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});
} else {
// We are the main thread messaging a pthread. We can message pthreads
// directly, so do not go through the `messageRelay`.
var worker = PThread.pthreads[targetThreadId];
if (!worker) {
#if ASSERTIONS
Expand Down
23 changes: 22 additions & 1 deletion src/shell.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,34 @@ if (ENVIRONMENT_IS_NODE) {

#if USE_PTHREADS
let nodeWorkerThreads;
#if ASSERTIONS
try {
nodeWorkerThreads = require('worker_threads');
} catch (e) {
console.error('The "worker_threads" module is not supported in this node.js build - perhaps a newer version is needed?');
throw e;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, by we should probably skip this whole try/catch in release builds.

}
global.Worker = nodeWorkerThreads.Worker;
#else
nodeWorkerThreads = require('worker_threads');
#endif
/**
* @constructor
* @param {string|URL} url
*/
let NodeWorker = nodeWorkerThreads.Worker;
// Create a polyfill for the Worker Web API based on Node's `worker_threads`.
// Specifically, paper over the difference that Node requires data and file
// protocol urls to be URLs while the Web expects them to be strings.
class Worker extends NodeWorker {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES6 classes are not yet used within the library sources, so I think this also requires an update to settings.TRANSPILE_TO_ES5.

Details
--- a/emcc.py
+++ b/emcc.py
@@ -2176,13 +2176,14 @@ def phase_linker_setup(options, state, newargs):
     # Emscripten requires certain ES6 constructs by default in library code
     # - https://caniuse.com/let              : EDGE:12 FF:44 CHROME:49 SAFARI:11
     # - https://caniuse.com/const            : EDGE:12 FF:36 CHROME:49 SAFARI:11
+    # - https://caniuse.com/class            : EDGE:13 FF:45 CHROME:49 SAFARI:9
     # - https://caniuse.com/arrow-functions: : EDGE:12 FF:22 CHROME:45 SAFARI:10
     # - https://caniuse.com/mdn-javascript_builtins_object_assign:
     #                                          EDGE:12 FF:34 CHROME:45 SAFARI:9
     # Taking the highest requirements gives is our minimum:
-    #                             Max Version: EDGE:12 FF:44 CHROME:49 SAFARI:11
-    settings.TRANSPILE_TO_ES5 = (settings.MIN_EDGE_VERSION < 12 or
-                                 settings.MIN_FIREFOX_VERSION < 44 or
+    #                             Max Version: EDGE:13 FF:45 CHROME:49 SAFARI:11
+    settings.TRANSPILE_TO_ES5 = (settings.MIN_EDGE_VERSION < 13 or
+                                 settings.MIN_FIREFOX_VERSION < 45 or
                                  settings.MIN_CHROME_VERSION < 49 or
                                  settings.MIN_SAFARI_VERSION < 110000 or
                                  settings.MIN_IE_VERSION != 0x7FFFFFFF)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Browser versions old enough to not support classes also don't support threads at all, so I don't think this should ever need to be polyfilled. Furthermore, the polyfilling doesn't actually work because it causes node to error out with TypeError: Class constructor Worker cannot be invoked without 'new' 🤯

constructor(url, ...rest) {
if (typeof url === 'string' &&
(url.startsWith('data:') || url.startsWith('file:'))) {
url = new URL(url);
}
super(url, ...rest);
}
}
global.Worker = Worker;
#endif

#if WASM == 2
Expand Down
36 changes: 36 additions & 0 deletions src/shell_minimal.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ function ready() {

#if USE_PTHREADS

// Worker polyfill copied from shell.js. Keep these in sync.
// TODO: Deduplicate?
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) {
let nodeWorkerThreads;
#if ASSERTIONS
try {
nodeWorkerThreads = require('worker_threads');
} catch (e) {
console.error('The "worker_threads" module is not supported in this node.js build - perhaps a newer version is needed?');
throw e;
}
#else
nodeWorkerThreads = require('worker_threads');
#endif
/**
* @constructor
* @param {string|URL} url
*/
let NodeWorker = nodeWorkerThreads.Worker;
// Create a polyfill for the Worker Web API based on Node's `worker_threads`.
// Specifically, paper over the difference that Node requires data and file
// protocol urls to be URLs while the Web expects them to be strings.
class Worker extends NodeWorker {
constructor(url, ...rest) {
if (typeof url === 'string' &&
(url.startsWith('data:') || url.startsWith('file:'))) {
url = new URL(url);
}
super(url, ...rest);
}
}
global.Worker = Worker;
}
#endif // ENVIRONMENT_MAY_BE_NODE

#if !MODULARIZE
// In MODULARIZE mode _scriptDir needs to be captured already at the very top of the page immediately when the page is parsed, so it is generated there
// before the page load. In non-MODULARIZE modes generate it here.
Expand Down
4 changes: 4 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ function handleMessage(e) {
#endif
#endif // MODULARIZE && EXPORT_ES6
} else if (e.data.cmd === 'run') {
Module['PThread'].receiveMessageRelayPort(e.data.port);
e.data.port.onmessage = handleMessage;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we use the same handler for messages regardless of where they come from? interesting...

// Pass the thread address to wasm to store it for fast access.
Module['__emscripten_thread_init'](e.data.pthread_ptr, /*isMainBrowserThread=*/0, /*isMainRuntimeThread=*/0, /*canBlock=*/1);

Expand Down Expand Up @@ -264,6 +266,7 @@ function handleMessage(e) {
#if ASSERTIONS
err('Pthread 0x' + Module['_pthread_self']().toString(16) + ' called exit(), calling _emscripten_thread_exit.');
#endif
Module['PThread'].closeMessageRelayPort();
Module['__emscripten_thread_exit'](ex.status);
}
}
Expand All @@ -284,6 +287,7 @@ function handleMessage(e) {
}
} else if (e.data.cmd === 'cancel') { // Main thread is asking for a pthread_cancel() on this thread.
if (Module['_pthread_self']()) {
Module['PThread'].closeMessageRelayPort();
Module['__emscripten_thread_exit']({{{ cDefine('PTHREAD_CANCELED') }}});
}
} else if (e.data.target === 'setimmediate') {
Expand Down
23 changes: 0 additions & 23 deletions system/lib/wasmfs/thread_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,6 @@ class ProxyWorker {
}
cond.notify_all();

// Sometimes the main thread is spinning, waiting on a WasmFS lock held
// by a thread trying to proxy work to this dedicated worker. In that
// case, the proxying message won't be relayed by the main thread and
// the system will deadlock. This heartbeat ensures that proxying work
// eventually gets done so the thread holding the lock can make forward
// progress even if the main thread is blocked.
//
// TODO: Remove this once we can postMessage directly between workers
// without involving the main thread.
//
// Note that this requires adding _emscripten_proxy_execute_queue to
// EXPORTED_FUNCTIONS.
EM_ASM({
var intervalID =
setInterval(() => {
if (ABORT) {
clearInterval(intervalID);
} else {
_emscripten_proxy_execute_queue($0);
}
}, 50);
}, queue.queue);

// Sit in the event loop performing work as it comes in.
emscripten_exit_with_live_runtime();
}) {
Expand Down
2 changes: 1 addition & 1 deletion test/other/metadce/test_metadce_minimal_pthreads.jssize
Original file line number Diff line number Diff line change
@@ -1 +1 @@
15763
17031