worker: add postMessageToThread · nodejs/node@73b5c16 · GitHub
Skip to content

Commit 73b5c16

Browse files
ShogunPandamarco-ippolito
authored andcommitted
worker: add postMessageToThread
PR-URL: #53682 Backport-PR-URL: #57101 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
1 parent 9e975f1 commit 73b5c16

14 files changed

Lines changed: 685 additions & 5 deletions

doc/api/errors.md

Lines changed: 49 additions & 0 deletions

doc/api/process.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
327327
likely best for long-running application) or upon process exit (which is likely
328328
most convenient for scripts).
329329

330+
### Event: `'workerMessage'`
331+
332+
<!-- YAML
333+
added: REPLACEME
334+
-->
335+
336+
* `value` {any} A value transmitted using [`postMessageToThread()`][].
337+
* `source` {number} The transmitting worker thread ID or `0` for the main thread.
338+
339+
The `'workerMessage'` event is emitted for any incoming message send by the other
340+
party by using [`postMessageToThread()`][].
341+
330342
### Event: `'uncaughtException'`
331343

332344
<!-- YAML
@@ -4173,6 +4185,7 @@ cases:
41734185
[`net.Server`]: net.md#class-netserver
41744186
[`net.Socket`]: net.md#class-netsocket
41754187
[`os.constants.dlopen`]: os.md#dlopen-constants
4188+
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
41764189
[`process.argv`]: #processargv
41774190
[`process.config`]: #processconfig
41784191
[`process.execPath`]: #processexecpath

doc/api/worker_threads.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,118 @@ if (isMainThread) {
220220
}
221221
```
222222

223+
## `worker.postMessageToThread(threadId, value[, transferList][, timeout])`
224+
225+
<!-- YAML
226+
added: REPLACEME
227+
-->
228+
229+
> Stability: 1.1 - Active development
230+
231+
* `destination` {number} The target thread ID. If the thread ID is invalid, a
232+
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
233+
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
234+
* `value` {any} The value to send.
235+
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
236+
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
237+
See [`port.postMessage()`][] for more information.
238+
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
239+
By default it's `undefined`, which means wait forever. If the operation times out,
240+
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
241+
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.
242+
243+
Sends a value to another worker, identified by its thread ID.
244+
245+
If the target thread has no listener for the `workerMessage` event, then the operation will throw
246+
a [`ERR_WORKER_MESSAGING_FAILED`][] error.
247+
248+
If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
249+
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.
250+
251+
This method should be used when the target thread is not the direct
252+
parent or child of the current thread.
253+
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
254+
and the [`worker.postMessage()`][] to let the threads communicate.
255+
256+
The example below shows the use of of `postMessageToThread`: it creates 10 nested threads,
257+
the last one will try to communicate with the main thread.
258+
259+
```mjs
260+
import { fileURLToPath } from 'node:url';
261+
import { once } from 'node:events';
262+
import process from 'node:process';
263+
import {
264+
isMainThread,
265+
postMessageToThread,
266+
threadId,
267+
workerData,
268+
Worker,
269+
} from 'node:worker_threads';
270+
271+
const channel = new BroadcastChannel('sync');
272+
const level = workerData?.level ?? 0;
273+
274+
if (level < 10) {
275+
const worker = new Worker(fileURLToPath(import.meta.url), {
276+
workerData: { level: level + 1 },
277+
});
278+
}
279+
280+
if (level === 0) {
281+
process.on('workerMessage', (value, source) => {
282+
console.log(`${source} -> ${threadId}:`, value);
283+
postMessageToThread(source, { message: 'pong' });
284+
});
285+
} else if (level === 10) {
286+
process.on('workerMessage', (value, source) => {
287+
console.log(`${source} -> ${threadId}:`, value);
288+
channel.postMessage('done');
289+
channel.close();
290+
});
291+
292+
await postMessageToThread(0, { message: 'ping' });
293+
}
294+
295+
channel.onmessage = channel.close;
296+
```
297+
298+
```cjs
299+
const { once } = require('node:events');
300+
const {
301+
isMainThread,
302+
postMessageToThread,
303+
threadId,
304+
workerData,
305+
Worker,
306+
} = require('node:worker_threads');
307+
308+
const channel = new BroadcastChannel('sync');
309+
const level = workerData?.level ?? 0;
310+
311+
if (level < 10) {
312+
const worker = new Worker(__filename, {
313+
workerData: { level: level + 1 },
314+
});
315+
}
316+
317+
if (level === 0) {
318+
process.on('workerMessage', (value, source) => {
319+
console.log(`${source} -> ${threadId}:`, value);
320+
postMessageToThread(source, { message: 'pong' });
321+
});
322+
} else if (level === 10) {
323+
process.on('workerMessage', (value, source) => {
324+
console.log(`${source} -> ${threadId}:`, value);
325+
channel.postMessage('done');
326+
channel.close();
327+
});
328+
329+
postMessageToThread(0, { message: 'ping' });
330+
}
331+
332+
channel.onmessage = channel.close;
333+
```
334+
223335
## `worker.receiveMessageOnPort(port)`
224336
225337
<!-- YAML
@@ -1361,6 +1473,10 @@ thread spawned will spawn another until the application crashes.
13611473
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
13621474
[`Buffer`]: buffer.md
13631475
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
1476+
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
1477+
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
1478+
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
1479+
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
13641480
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
13651481
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
13661482
[`FileHandle`]: fs.md#class-filehandle

lib/internal/errors.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,6 +1911,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
19111911
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
19121912
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
19131913
Error);
1914+
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
1915+
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
1916+
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
1917+
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
19141918
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
19151919
E('ERR_WORKER_OUT_OF_MEMORY',
19161920
'Worker terminated due to reaching memory limit: %s', Error);

lib/internal/main/worker_thread.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const {
4444
kStdioWantsMoreDataCallback,
4545
} = workerIo;
4646

47+
const { setupMainThreadPort } = require('internal/worker/messaging');
48+
4749
const {
4850
onGlobalUncaughtException,
4951
} = require('internal/process/execution');
@@ -99,6 +101,7 @@ port.on('message', (message) => {
99101
manifestURL,
100102
publicPort,
101103
workerData,
104+
mainThreadPort,
102105
} = message;
103106

104107
if (doEval !== 'internal') {
@@ -112,6 +115,7 @@ port.on('message', (message) => {
112115
}
113116

114117
require('internal/worker').assignEnvironmentData(environmentData);
118+
setupMainThreadPort(mainThreadPort);
115119

116120
if (SharedArrayBuffer !== undefined && Atomics !== undefined) {
117121
// The counter is only passed to the workers created by the main thread,

lib/internal/worker.js

Lines changed: 11 additions & 4 deletions

0 commit comments

Comments
 (0)