-
Notifications
You must be signed in to change notification settings - Fork 0
Esm off thread unlock worker #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,9 +4,6 @@ const { | |
| Int32Array, | ||
| ReflectApply, | ||
| SafeWeakMap, | ||
| TypedArrayPrototypeSet, | ||
| TypedArrayPrototypeSlice, | ||
| Uint8Array, | ||
| globalThis: { | ||
| Atomics, | ||
| }, | ||
|
|
@@ -24,9 +21,7 @@ const { workerData } = require('worker_threads'); | |
| // lock = 0 -> main sleeps | ||
| // lock = 1 -> worker sleeps | ||
| const lock = new Int32Array(workerData.lock); // Required by Atomics | ||
| const done = new Uint8Array(workerData.done); // Coordinate chunks between main and worker | ||
| const chunkLength = new Uint8Array(workerData.chunkLength); // Coordinate chunks between main and worker | ||
| const data = new Uint8Array(workerData.data); // Chunks content | ||
| const { syncCommPort } = workerData; // To receive work signals. | ||
|
|
||
| function releaseLock() { | ||
| Atomics.store(lock, 0, 1); // Send response to main | ||
|
|
@@ -49,56 +44,50 @@ function releaseLock() { | |
| initializationError = exception; | ||
| } | ||
|
|
||
| // ! Put as little above this line as possible | ||
| releaseLock(); // Send 'ready' signal to main | ||
| syncCommPort.on('message', handleSyncMessage); | ||
|
|
||
| // Preserve state across iterations of the loop so that we can return responses in chunks | ||
| let serializedResponse, chunksCount, chunksSent = 0; | ||
| while (true) { // The loop is needed in order to cycle through requests | ||
| Atomics.wait(lock, 0, 1); // This pauses the while loop | ||
| // ! Put as little above this line as possible. | ||
| releaseLock(); // Send 'ready' signal to main. | ||
|
|
||
| async function handleSyncMessage({ method, args }) { | ||
| // Each potential exception needs to be caught individually so that the correct error is sent to the main thread. | ||
| let response; | ||
| if (initializationError) { | ||
| serializedResponse = serialize(initializationError); | ||
| chunksCount = 1; | ||
| } else if (done[0] !== 0) { // Not currently sending chunks to main thread; process new request | ||
| const requestLength = deserialize(chunkLength); | ||
| const { method, args } = deserialize(data.slice(0, requestLength)); | ||
| response = initializationError; | ||
| } else { | ||
| if (!hooks[method]) { | ||
| throw new ERR_INVALID_ARG_VALUE('method', method); | ||
| } | ||
|
|
||
| const response = await ReflectApply(hooks[method], hooks, args); | ||
| if (!response) { | ||
| throw new ERR_INVALID_RETURN_VALUE('object', method, response) | ||
| try { | ||
| response = await ReflectApply(hooks[method], hooks, args); | ||
| if (!response) { | ||
| throw new ERR_INVALID_RETURN_VALUE('object', method, response); | ||
| } | ||
| } catch (exception) { | ||
| response = exception; | ||
| } | ||
|
|
||
| serializedResponse = serialize(response); | ||
| chunksCount = Math.ceil(serializedResponse.byteLength / data.length); | ||
| chunksSent = 0; | ||
| } | ||
|
|
||
| const startIndex = chunksSent * data.length; | ||
| const endIndex = startIndex + data.length; | ||
| const chunk = TypedArrayPrototypeSlice(serializedResponse, startIndex, endIndex); | ||
| const isLastChunk = chunksSent === chunksCount - 1; | ||
| TypedArrayPrototypeSet(data, chunk); | ||
| TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength)); | ||
| TypedArrayPrototypeSet(done, isLastChunk ? [1] : [0]); | ||
| if (isLastChunk) { | ||
| serializedResponse = undefined; | ||
| chunksCount = undefined; | ||
| chunksSent = 0; | ||
| } else { | ||
| chunksSent++; | ||
| // Send the method response (or exception) to the main thread. | ||
| try { | ||
| syncCommPort.postMessage(response); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure the cause of the huge performance difference between I think if you instead transfer the response via the 2nd param, we'll see more similar (better) performance with
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 2nd param of |
||
| } catch (exception) { | ||
| // Or send the exception thrown when trying to send the response. | ||
| syncCommPort.postMessage(exception); | ||
| } | ||
| releaseLock(); | ||
| } | ||
| })().catch((exception) => { | ||
| })().catch(exception => { | ||
| // Send the exception up to the main thread so it can throw it and crash the process | ||
| process._rawDebug('exception in worker:', exception) | ||
| const chunk = serialize(exception); | ||
| TypedArrayPrototypeSet(data, chunk); | ||
| TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength)); | ||
| TypedArrayPrototypeSet(done, [1]); | ||
| process._rawDebug('exception in worker:', exception) // TODO: Remove this once exception handling is reliable | ||
| syncCommPort.postMessage(exception); | ||
| releaseLock(); | ||
| }); | ||
|
|
||
| process.on('uncaughtException', (err) => { | ||
| process._rawDebug('process uncaughtException:', err); | ||
| const { triggerUncaughtException } = internalBinding('errors'); | ||
| releaseLock(); | ||
| triggerUncaughtException(err); | ||
| }); | ||
Uh oh!
There was an error while loading. Please reload this page.