-
Notifications
You must be signed in to change notification settings - Fork 3.4k
[Proxying] Use a dedicated worker to pass messages between threads #18563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Current dependencies on/for this PR: This comment was auto-generated by Graphite. |
src/library_pthread.js
Outdated
#if ENVIRONMENT_MAY_BE_NODE | ||
(ENVIRONMENT_IS_NODE ? | ||
` | ||
(await import('node:worker_threads')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I can't put this header in proxy_worker.js because Chrome complains about the existence of the import
token, even if it isn't executed. Ideas for cleaner workarounds very welcome.
src/proxy_broker.js
Outdated
let buffered = bufferedMessages.get(thread); | ||
if (buffered === undefined) { | ||
buffered = []; | ||
bufferedMessages.set(thread, buffered); | ||
} | ||
buffered.push(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JS has map.has
which is shorter for stuff like this:
let buffered = bufferedMessages.get(thread); | |
if (buffered === undefined) { | |
buffered = []; | |
bufferedMessages.set(thread, buffered); | |
} | |
buffered.push(msg); | |
if (!bufferedMessages.has(thread)) { | |
bufferedMessages.set(thread, []); | |
} | |
bufferedMessages.get(thread).push(msg); |
But it may be slightly less efficient OTOH, I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was trying to minimize the number of lookups to be 1 or 2 rather than 2 or 3. (I was annoyed that there's no way to do this in 1 lookup like I could in C++.) Do you think minimizing the code is more important than minimizing the lookups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, in general I'd guess this is not very hot code (we don't pass that many messages using postMessage). So maybe size and clarity matter more?
src/worker.js
Outdated
@@ -284,6 +287,7 @@ function handleMessage(e) { | |||
} | |||
} else if (e.data.cmd === 'cancel') { // Main thread is asking for a pthread_cancel() on this thread. | |||
if (Module['_pthread_self']()) { | |||
closeProxyBrokerPort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these need to go through Module
, like the calls around them. worker.js
is a separate file from the main JS.
Or, we can just add a proxy closing function here perhaps if it is short and trivial?
@@ -1094,8 +1135,10 @@ var LibraryPThread = { | |||
_emscripten_notify_task_queue: function(targetThreadId, currThreadId, mainThreadId, queue) { | |||
if (targetThreadId == currThreadId) { | |||
setTimeout(() => executeNotifiedProxyingQueue(queue)); | |||
} else if (targetThreadId == mainThreadId) { | |||
postMessage({'cmd' : 'processProxyingQueue', 'queue' : queue}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the main thread special here? (Is this just an optimization?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in principle the main thread could listen only on the message broker's channel for messages from all other threads, but since it already listens directly to every thread and the code isn't that complex, I figured it would be better to keep messages to the main thread as 1 hop instead of 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps worth a comment?
I wonder if "broker" is the right work here? How about "relay" instead? |
Sure, I'll change the name to "messageRelay" |
I'm pretty sure the tests are failing because I never terminate the message relay worker. I tried to figure out where a good place to do that is, but I couldn't find one. It seems that we shouldn't terminate it if we should keep the runtime alive, but AFAICT we never explicitly decide to stop keeping the runtime alive as we terminate the process. @sbc100 or @kripken, do you have suggestions about when and where we should terminate the message relay? |
What do you mean by "terminate?" If it's waiting in the event loop, I think it's fine to leave it. It will be cleaned up by the browser when the tab closes. |
How about doing it as part of |
@@ -97,7 +98,6 @@ var LibraryPThread = { | |||
} | |||
#endif | |||
}, | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete this empty line but include one before initMessageRelay
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, will be more consistent here.
src/library_pthread.js
Outdated
function handleMessage(msg) { | ||
const thread = msg.data.targetThread; | ||
const port = threadPorts.get(thread); | ||
if (port !== undefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this just be if (port)
?
src/library_pthread.js
Outdated
bufferedMessages.delete(thread); | ||
} | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else if
instead of return here? .. and then maybe assert in the final else
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have assert
available in this worker, but I will print an error at least.
} | ||
#else | ||
MsgChannel = MessageChannel; | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid declaring a new name here and just polyfil on node using var MessageChannel = require ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or global.MessageChannel = require ...
)
The browser already works fine, I think. Specifically I'm seeing that node doesn't exit, but I traced everything that happened at process exit and it never actually calls terminateAllThreads, so killing the worker in terminateAllThreads wouldn't solve the problem. |
Are you building with |
I'm not building with EXIT_RUNTIME, but given that Node exited without that option before this PR, it would be a serious regression to start requiring EXIT_RUNTIME to get Node to exit. |
Rather than forwarding messages from one pthread to another through the main thread, forward them through a dedicated worker, decreasing message latency in cases where the main thread is busy. The new message broker worker is spawned via a data URL so there are no new JS files for users to distribute. Whenever a pthread is created, a new MessageChannel is created alongside it, with one MessagePort sent to the broker and the other MessagePort sent to the pthread's worker. The broker receives messages on these MessagePorts and forwards them on to their recipients' MessagePorts. It is possible for the broker to receive a message for a thread for which the broker has no MessagePort. This can happen when pthread_create returns a pthread_t before the main thread has finished asynchronously spinning up a new worker to run the thread. The spawning thread may then immediately proxy work to the new thread, causing a message to be sent to the broker before the main thread has notified the broker of the new thread's existence and sent the broker the new thread's MessagePort. When this happens the broker buffers the message and forwards it along once it has received the recipient's MessagePort. When a pthread exits or is cancelled, it closes its MessagePort so that no further messages will be received on it, then the main thread notifies the broker that the thread has been destroyed so the broker can release its resources for that thread. Messages that make it to the broker after the exiting thread has closed its MessagePort will be silently dropped or sent to the next thread spawned with the same ID, but we have always considered it user error to proxy to an exiting thread, so it's unclear whether doing anything better in that case would be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I'm surprised how simple this turned out to be. The only gross part the data URL stuff..
@@ -104,6 +104,8 @@ var LibraryPThread = { | |||
// things. | |||
PThread['receiveObjectTransfer'] = PThread.receiveObjectTransfer; | |||
PThread['threadInitTLS'] = PThread.threadInitTLS; | |||
PThread['receiveMessageRelayPort'] = PThread.receiveMessageRelayPort; | |||
PThread['closeMessageRelayPort'] = PThread.closeMessageRelayPort; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably figure out a way to do this that isn't do hacky (one day).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just calling these messageRelayClose
and messageReleyReceive
? (Probably just my person preference for shorter names so feel free to ignore)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that shorter names would be nicer, but I think it's useful to clarify that we're receiving the port and not an actual message from the relay.
@@ -586,6 +588,11 @@ Object.assign(global, { | |||
return PThread.unusedWorkers.pop(); | |||
}, | |||
|
|||
receiveMessageRelayPort: function(port) { | |||
assert(ENVIRONMENT_IS_PTHREAD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap asserts in #if ASSERTIONS
src/library_pthread.js
Outdated
if (ENVIRONMENT_IS_NODE) { | ||
// TODO: Node 16+ has btoa, so remove this when we drop support for | ||
// older Nodes. | ||
global.btoa = (s) => { return Buffer.from(s).toString('base64'); }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just (s) => Buffer.from(s).toString('base64');
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about wrapping this in if (!global.btoa)
? We might even consider using our src/polyfill/*
mechanism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I recently learned that I can use encodeURIComponent
here and avoid all this btoa
nonsense entirely.
src/library_pthread.js
Outdated
global.MessageChannel = require('worker_threads').MessageChannel; | ||
} | ||
#endif | ||
var channel = new MessageChannel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this relayChannel?
worker.postMessage(msg, threadParams.transferList); | ||
worker.postMessage(msg, threadParams.transferList.concat([channel.port1])); | ||
PThread.messageRelay.postMessage({ | ||
'cmd': 'create', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add a comment here.. Such as "Send one end of the relay channel to the newly created thread, and the other end of the messageRelay worker"?
@@ -1094,8 +1135,10 @@ var LibraryPThread = { | |||
_emscripten_notify_task_queue: function(targetThreadId, currThreadId, mainThreadId, queue) { | |||
if (targetThreadId == currThreadId) { | |||
setTimeout(() => executeNotifiedProxyingQueue(queue)); | |||
} else if (targetThreadId == mainThreadId) { | |||
postMessage({'cmd' : 'processProxyingQueue', 'queue' : queue}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps worth a comment?
src/shell.js
Outdated
* @param {string|URL} url | ||
*/ | ||
let NodeWorker = nodeWorkerThreads.Worker; | ||
// Node requires data and file protocol urls to be URLs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "Create a polyfill for the Worker web API based on nodes worker_threads
which is slightly different"
@@ -270,7 +270,22 @@ if (ENVIRONMENT_IS_NODE) { | |||
console.error('The "worker_threads" module is not supported in this node.js build - perhaps a newer version is needed?'); | |||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, by we should probably skip this whole try/catch in release builds.
@@ -217,6 +217,8 @@ function handleMessage(e) { | |||
#endif | |||
#endif // MODULARIZE && EXPORT_ES6 | |||
} else if (e.data.cmd === 'run') { | |||
Module['PThread'].receiveMessageRelayPort(e.data.port); | |||
e.data.port.onmessage = handleMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we use the same handler for messages regardless of where they come from? interesting...
BTW, I think on the web at least we can give these workers useful names. We should certainly name this one something obvious (at least in debug builds). |
(the names show up in dev tools IIUC). |
Do you know that the status of workers-creating-workers is... because this thread could also be the thread manager if that works. |
IIRC, the latest safari allows this but previous versions did not. It would be great if we could make this worker responsible for managing thread lifetimes as well because then we could get a single, consistent picture of what threads are or are not live. |
*/ | ||
let NodeWorker = nodeWorkerThreads.Worker; | ||
// Node requires data and file protocol urls to be URLs. | ||
class Worker extends NodeWorker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ES6 classes are not yet used within the library sources, so I think this also requires an update to settings.TRANSPILE_TO_ES5
.
Details
--- a/emcc.py
+++ b/emcc.py
@@ -2176,13 +2176,14 @@ def phase_linker_setup(options, state, newargs):
# Emscripten requires certain ES6 constructs by default in library code
# - https://caniuse.com/let : EDGE:12 FF:44 CHROME:49 SAFARI:11
# - https://caniuse.com/const : EDGE:12 FF:36 CHROME:49 SAFARI:11
+ # - https://caniuse.com/class : EDGE:13 FF:45 CHROME:49 SAFARI:9
# - https://caniuse.com/arrow-functions: : EDGE:12 FF:22 CHROME:45 SAFARI:10
# - https://caniuse.com/mdn-javascript_builtins_object_assign:
# EDGE:12 FF:34 CHROME:45 SAFARI:9
# Taking the highest requirements gives is our minimum:
- # Max Version: EDGE:12 FF:44 CHROME:49 SAFARI:11
- settings.TRANSPILE_TO_ES5 = (settings.MIN_EDGE_VERSION < 12 or
- settings.MIN_FIREFOX_VERSION < 44 or
+ # Max Version: EDGE:13 FF:45 CHROME:49 SAFARI:11
+ settings.TRANSPILE_TO_ES5 = (settings.MIN_EDGE_VERSION < 13 or
+ settings.MIN_FIREFOX_VERSION < 45 or
settings.MIN_CHROME_VERSION < 49 or
settings.MIN_SAFARI_VERSION < 110000 or
settings.MIN_IE_VERSION != 0x7FFFFFFF)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Browser versions old enough to not support classes also don't support threads at all, so I don't think this should ever need to be polyfilled. Furthermore, the polyfilling doesn't actually work because it causes node to error out with TypeError: Class constructor Worker cannot be invoked without 'new'
🤯
@sbc100 The deadlock in |
It also looks like the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % @sbc100 's comments
I think we need to pull in the polyfill there. We try hard to avoid code size increases in that mode especially but this does fix deadlocks so we have no choice, as I see it. |
I ended up copy-pasting the polyfill to shell_minimal.js. Adding a separate file to hold the shared code and |
The same 3 firefox tests keep failing with an unresponsive http server. @sbc100, is that safe to ignore? Should I merge this manually? |
These three tests are failing reliably enough that I guess I'll install FF tomorrow and investigate further :/ |
Ok, wow, there is actually a difference in behavior between FF and Chrome here. This entire approach does not work in FF because the postMessage to the message relay via the MessagePort does not happen unless the sending worker returns to the event loop, which does not happen in the case of synchronous proxying. |
Is it that case the |
Yes, exactly. |
I left a comment on the relevant firefox bug: https://bugzilla.mozilla.org/show_bug.cgi?id=1752287 Closing this PR since it isn't portable across browsers. Maybe we can revisit this in the future along with #18633 |
Rather than forwarding messages from one pthread to another through the main
thread, forward them through a dedicated worker, decreasing message latency in
cases where the main thread is busy. The new message relay worker is spawned via
a data URL so there are no new JS files for users to distribute. Whenever a
pthread is created, a new MessageChannel is created alongside it, with one
MessagePort sent to the relay and the other MessagePort sent to the pthread's
worker. The relay receives messages on these MessagePorts and forwards them on
to their recipients' MessagePorts.
It is possible for the relay to receive a message for a thread for which the
relay has no MessagePort. This can happen when pthread_create returns a
pthread_t before the main thread has finished asynchronously spinning up a new
worker to run the thread. The spawning thread may then immediately proxy work to
the new thread, causing a message to be sent to the relay before the main thread
has notified the relay of the new thread's existence and sent the relay the new
thread's MessagePort. When this happens the relay buffers the message and
forwards it along once it has received the recipient's MessagePort.
When a pthread exits or is cancelled, it closes its MessagePort so that no
further messages will be received on it, then the main thread notifies the relay
that the thread has been destroyed so the relay can release its resources for
that thread. Messages that make it to the relay after the exiting thread has
closed its MessagePort will be silently dropped or sent to the next thread
spawned with the same ID, but we have always considered it user error to proxy
to an exiting thread, so it's unclear whether doing anything better in that case
would be worth it.