Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/api/esm.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ command flag enabled.
Provides a module-relative resolution function scoped to each module, returning
the URL string. In alignment with browser behavior, this now returns
synchronously.

> **Caveat** This can result in synchronous file-system operations, which
> can impact performance similarly to `require.resolve`.

Expand Down
83 changes: 23 additions & 60 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ const {
SafeSet,
StringPrototypeSlice,
StringPrototypeToUpperCase,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
globalThis,
} = primordials;

Expand All @@ -24,14 +22,11 @@ const {
ERR_INVALID_RETURN_VALUE,
} = require('internal/errors').codes;
const { URL } = require('internal/url');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
} = require('internal/util/types');
const {
deserialize,
serialize,
} = require('v8');
const {
validateObject,
validateString,
Expand Down Expand Up @@ -447,22 +442,15 @@ ObjectSetPrototypeOf(Hooks.prototype, null);
class HooksProxy {
/**
* The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This
* segment is used to tell the main to sleep when the worker is processing, and vice versa
* (for the worker to sleep whilst the main thread is processing).
* segment is used to tell the main to sleep when the worker is processing.
* 0 -> main sleeps
* 1 -> worker sleeps
* 1 -> main wakes up
*/
#lock;

#done;
#chunkLength;

/**
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
* A MessagePort used to synchronously communicate with the worker.
*/
#data;
#syncCommPort;

#isReady = false;

Expand All @@ -472,24 +460,25 @@ class HooksProxy {
*/
constructor() {
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');

const lock = new SharedArrayBuffer(4); // Signal to tell the other thread to sleep or wake
const done = new SharedArrayBuffer(1); // For chunking, to know whether the last chunk has been sent
const chunkLength = new SharedArrayBuffer(32); // For chunking, to know the length of the current chunk
const data = new SharedArrayBuffer(2048); // The data for the request and response
this.#lock = new Int32Array(lock);
this.#done = new Uint8Array(done);
this.#chunkLength = new Uint8Array(chunkLength);
this.#data = new Uint8Array(data);
const syncCommChannel = new MessageChannel();
this.#syncCommPort = syncCommChannel.port1;

const worker = this.worker = new InternalWorker('internal/modules/esm/worker', {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: { lock, done, chunkLength, data },
workerData: {
lock,
syncCommPort: syncCommChannel.port2,
},
transferList: [syncCommChannel.port2],
});
worker.unref(); // ! Allows the process to eventually exit when worker is in its final sleep.
worker.unref(); // ! Allows the process to eventually exit.
}

makeRequest(method, ...args) {
Expand All @@ -502,48 +491,22 @@ class HooksProxy {
this.#isReady = true;
}

const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);
TypedArrayPrototypeSet(this.#chunkLength, serialize(request.byteLength));
TypedArrayPrototypeSet(this.#done, [1]);

const chunks = [];
let done = false;
while (done === false) {
this.#awaitResponse();

try {
var chunkLength = deserialize(this.#chunkLength);
} catch (err) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
}
if (!chunkLength) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); }

const chunk = TypedArrayPrototypeSlice(this.#data, 0, chunkLength);
if (!chunk) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); }
// Pass work to the worker.
this.#syncCommPort.postMessage({ method, args });

ArrayPrototypePush(chunks, chunk);
if (this.#done[0] === 1) { done = true; }
}
if (chunks.length === 0) { // Response should not be empty
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
}
const reassembledChunks = Buffer.concat(chunks);
const response = deserialize(reassembledChunks);
Atomics.store(this.#lock, 0, 0); // Reset lock.
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds.

const response = receiveMessageOnPort(this.#syncCommPort).message;
if (response instanceof Error) {
// An exception was thrown in the worker thread; re-throw to crash the process
const { triggerUncaughtException } = internalBinding('errors');
triggerUncaughtException(response);
} else if (!response) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
return response;
}

return response;
}

#awaitResponse() {
Atomics.store(this.#lock, 0, 0); // Send request to worker
Atomics.notify(this.#lock, 0); // Notify worker of new request
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds
}
}
ObjectSetPrototypeOf(HooksProxy.prototype, null);
Expand Down
75 changes: 32 additions & 43 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ const {
Int32Array,
ReflectApply,
SafeWeakMap,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
Uint8Array,
globalThis: {
Atomics,
},
Expand All @@ -24,9 +21,7 @@ const { workerData } = require('worker_threads');
// lock = 0 -> main sleeps
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const done = new Uint8Array(workerData.done); // Coordinate chunks between main and worker
const chunkLength = new Uint8Array(workerData.chunkLength); // Coordinate chunks between main and worker
const data = new Uint8Array(workerData.data); // Chunks content
const { syncCommPort } = workerData; // To receive work signals.

function releaseLock() {
Atomics.store(lock, 0, 1); // Send response to main
Expand All @@ -49,56 +44,50 @@ function releaseLock() {
initializationError = exception;
}

// ! Put as little above this line as possible
releaseLock(); // Send 'ready' signal to main
syncCommPort.on('message', handleSyncMessage);

// Preserve state across iterations of the loop so that we can return responses in chunks
let serializedResponse, chunksCount, chunksSent = 0;
while (true) { // The loop is needed in order to cycle through requests
Atomics.wait(lock, 0, 1); // This pauses the while loop
// ! Put as little above this line as possible.
releaseLock(); // Send 'ready' signal to main.

async function handleSyncMessage({ method, args }) {
// Each potential exception needs to be caught individually so that the correct error is sent to the main thread.
let response;
if (initializationError) {
serializedResponse = serialize(initializationError);
chunksCount = 1;
} else if (done[0] !== 0) { // Not currently sending chunks to main thread; process new request
const requestLength = deserialize(chunkLength);
const { method, args } = deserialize(data.slice(0, requestLength));
response = initializationError;
} else {
if (!hooks[method]) {
throw new ERR_INVALID_ARG_VALUE('method', method);
}

const response = await ReflectApply(hooks[method], hooks, args);
if (!response) {
throw new ERR_INVALID_RETURN_VALUE('object', method, response)
try {
response = await ReflectApply(hooks[method], hooks, args);
if (!response) {
throw new ERR_INVALID_RETURN_VALUE('object', method, response);
}
} catch (exception) {
response = exception;
}

serializedResponse = serialize(response);
chunksCount = Math.ceil(serializedResponse.byteLength / data.length);
chunksSent = 0;
}

const startIndex = chunksSent * data.length;
const endIndex = startIndex + data.length;
const chunk = TypedArrayPrototypeSlice(serializedResponse, startIndex, endIndex);
const isLastChunk = chunksSent === chunksCount - 1;
TypedArrayPrototypeSet(data, chunk);
TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength));
TypedArrayPrototypeSet(done, isLastChunk ? [1] : [0]);
if (isLastChunk) {
serializedResponse = undefined;
chunksCount = undefined;
chunksSent = 0;
} else {
chunksSent++;
// Send the method response (or exception) to the main thread.
try {
syncCommPort.postMessage(response);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the cause of the huge performance difference between SharedArrayBuffer and postMessage() is because with the SAB, it's a single shared memory space (so the transfer across threads is merely a context switch and nothing is moved/copied); but with postMessage(), when only the first arg is used, the response is copied (which is relatively expensive, on top of consuming double the space).

I think if you instead transfer the response via the 2nd param, we'll see more similar (better) performance with postMessage()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2nd param of postMessage can only be used with transferable objects. response is not transferable. I still haven't seen any benchmark or perf numbers so I'm not really inclined to talk about it just based on assumptions.

} catch (exception) {
// Or send the exception thrown when trying to send the response.
syncCommPort.postMessage(exception);
}
releaseLock();
}
})().catch((exception) => {
})().catch(exception => {
// Send the exception up to the main thread so it can throw it and crash the process
process._rawDebug('exception in worker:', exception)
const chunk = serialize(exception);
TypedArrayPrototypeSet(data, chunk);
TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength));
TypedArrayPrototypeSet(done, [1]);
process._rawDebug('exception in worker:', exception) // TODO: Remove this once exception handling is reliable
syncCommPort.postMessage(exception);
releaseLock();
});

process.on('uncaughtException', (err) => {
process._rawDebug('process uncaughtException:', err);
const { triggerUncaughtException } = internalBinding('errors');
releaseLock();
triggerUncaughtException(err);
});