55 ArrayPrototypeMap,
66 ArrayPrototypePush,
77 AtomicsAdd,
8+ AtomicsNotify,
9+ AtomicsStore,
10+ AtomicsWaitAsync,
811 Float64Array,
912 FunctionPrototypeBind,
13+ Int32Array,
1014 JSONStringify,
1115 MathMax,
1216 ObjectEntries,
@@ -34,12 +38,14 @@ const {
3438
3539const errorCodes = require ( 'internal/errors' ) . codes ;
3640const {
41+ ERR_INVALID_ARG_TYPE ,
42+ ERR_INVALID_ARG_VALUE ,
43+ ERR_WORKER_CONNECTION_REFUSED ,
44+ ERR_WORKER_INVALID_EXEC_ARGV ,
3745 ERR_WORKER_NOT_RUNNING ,
3846 ERR_WORKER_PATH ,
47+ ERR_WORKER_SAME_THREAD ,
3948 ERR_WORKER_UNSERIALIZABLE_ERROR ,
40- ERR_WORKER_INVALID_EXEC_ARGV ,
41- ERR_INVALID_ARG_TYPE ,
42- ERR_INVALID_ARG_VALUE ,
4349} = errorCodes ;
4450
4551const workerIo = require ( 'internal/worker/io' ) ;
@@ -59,7 +65,7 @@ const {
5965const { deserializeError } = require ( 'internal/error_serdes' ) ;
6066const { fileURLToPath, isURL, pathToFileURL } = require ( 'internal/url' ) ;
6167const { kEmptyObject } = require ( 'internal/util' ) ;
62- const { validateArray, validateString } = require ( 'internal/validators' ) ;
68+ const { validateArray, validateFunction , validateString } = require ( 'internal/validators' ) ;
6369const {
6470 throwIfBuildingSnapshot,
6571} = require ( 'internal/v8/startup_snapshot' ) ;
@@ -74,6 +80,8 @@ const {
7480 kCodeRangeSizeMb,
7581 kStackSizeMb,
7682 kTotalResourceLimitCount,
83+ sendToWorker,
84+ setMainPort,
7785} = internalBinding ( 'worker' ) ;
7886
7987const kHandle = Symbol ( 'kHandle' ) ;
@@ -100,6 +108,14 @@ let cwdCounter;
100108
101109const environmentData = new SafeMap ( ) ;
102110
111+ // SharedArrayBuffer must always be Int32, so it's * 4.
112+ // We need one for the operation status (performing / performed) and one for the result (success / failure).
113+ const WORKER_MESSAGING_SHARED_DATA = 2 * 4 ;
114+ const WORKER_MESSAGING_STATUS_INDEX = 0 ;
115+ const WORKER_MESSAGING_RESULT_INDEX = 1 ;
116+ let connectionsListener = null ;
117+ let mainPortWasSetup = false ;
118+
103119// SharedArrayBuffers can be disabled with --enable-sharedarraybuffer-per-context.
104120if ( isMainThread && SharedArrayBuffer !== undefined ) {
105121 cwdCounter = new Uint32Array ( new SharedArrayBuffer ( 4 ) ) ;
@@ -527,6 +543,79 @@ function eventLoopUtilization(util1, util2) {
527543 ) ;
528544}
529545
546+ function setConnectionsListener ( fn ) {
547+ if ( isMainThread && ! mainPortWasSetup ) {
548+ setupMainPort ( ) ;
549+ mainPortWasSetup = true ;
550+ }
551+
552+ if ( typeof fn === 'undefined' || fn === null ) {
553+ connectionsListener = null ;
554+ return ;
555+ }
556+
557+ validateFunction ( fn , 'fn' ) ;
558+ connectionsListener = fn ;
559+ }
560+
561+ async function processConnectionRequest ( request ) {
562+ const status = new Int32Array ( request . memory ) ;
563+
564+ try {
565+ const result = await connectionsListener ?. ( request . sender , request . port , request . data ) ;
566+ AtomicsStore ( status , WORKER_MESSAGING_RESULT_INDEX , result === true ? 0 : 1 ) ;
567+ } catch ( e ) {
568+ debug ( 'connections listener rejected' , e ) ;
569+ AtomicsStore ( status , WORKER_MESSAGING_RESULT_INDEX , 2 ) ;
570+ } finally {
571+ AtomicsNotify ( status , WORKER_MESSAGING_STATUS_INDEX , 1 ) ;
572+ }
573+ }
574+
575+ async function connect ( target , data ) {
576+ if ( target === threadId ) {
577+ throw new ERR_WORKER_SAME_THREAD ( ) ;
578+ }
579+
580+ // Create a shared array to exchange the status and the result
581+ const memory = new SharedArrayBuffer ( WORKER_MESSAGING_SHARED_DATA ) ;
582+ const status = new Int32Array ( memory ) ;
583+ const promise = AtomicsWaitAsync ( status , WORKER_MESSAGING_STATUS_INDEX , 0 ) . value ;
584+
585+ // Create the channel and send it to the other thread
586+ const { port1, port2 } = new MessageChannel ( ) ;
587+ sendToWorker ( target , { type : messageTypes . CONNECT , sender : threadId , port : port2 , memory, data } , [ port2 ] ) ;
588+
589+ // Wait for the response
590+ await promise ;
591+
592+ if ( status [ WORKER_MESSAGING_RESULT_INDEX ] === 1 ) {
593+ port1 . close ( ) ;
594+ port2 . close ( ) ;
595+ throw new ERR_WORKER_CONNECTION_REFUSED ( ) ;
596+ }
597+
598+ return port1 ;
599+ }
600+
601+ function setupMainPort ( ) {
602+ const { port1, port2 } = new MessageChannel ( ) ;
603+ setMainPort ( port2 ) ;
604+
605+ // Set message management
606+ port1 . on ( 'message' , ( message ) => {
607+ if ( message . type === messageTypes . CONNECT ) {
608+ processConnectionRequest ( message ) ;
609+ } else {
610+ assert ( message . type === messageTypes . CONNECT , `Unknown worker message type ${ message . type } ` ) ;
611+ }
612+ } ) ;
613+
614+ // Never block the process on this channel
615+ port1 . unref ( ) ;
616+ port2 . unref ( ) ;
617+ }
618+
530619module . exports = {
531620 ownsProcessState,
532621 kIsOnline,
@@ -537,6 +626,10 @@ module.exports = {
537626 setEnvironmentData,
538627 getEnvironmentData,
539628 assignEnvironmentData,
629+ setConnectionsListener,
630+ setupMainPort,
631+ processConnectionRequest,
632+ connect,
540633 threadId,
541634 InternalWorker,
542635 Worker,
0 commit comments