diff --git a/e2e/__tests__/infinite-loop.test.ts b/e2e/__tests__/infinite-loop.test.ts new file mode 100644 index 000000000000..a0b84db3fd77 --- /dev/null +++ b/e2e/__tests__/infinite-loop.test.ts @@ -0,0 +1,37 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +import {tmpdir} from 'os'; +import * as path from 'path'; +import { + cleanup, + generateTestFilesToForceUsingWorkers, + writeFiles, +} from '../Utils'; +import runJest from '../runJest'; + +const DIR = path.resolve(tmpdir(), 'inspector'); + +afterEach(() => cleanup(DIR)); + +it('fails a test which causes an infinite loop', () => { + const testFiles = generateTestFilesToForceUsingWorkers(); + + writeFiles(DIR, { + ...testFiles, + '__tests__/inspector.test.js': ` + test('infinite loop error', () => { + while(true) {} + }); + `, + 'package.json': '{}', + }); + + const {exitCode, stderr} = runJest(DIR, ['--maxWorkers=2']); + + expect(exitCode).toBe(1); + expect(stderr).toMatch(/worker.+unresponsive/); +}); diff --git a/packages/jest-haste-map/src/__tests__/index.test.js b/packages/jest-haste-map/src/__tests__/index.test.js index d400e73cbe68..7170aae37238 100644 --- a/packages/jest-haste-map/src/__tests__/index.test.js +++ b/packages/jest-haste-map/src/__tests__/index.test.js @@ -20,7 +20,7 @@ jest.mock('child_process', () => ({ })); jest.mock('jest-worker', () => ({ - Worker: jest.fn(worker => { + create: jest.fn(worker => { mockWorker = jest.fn((...args) => require(worker).worker(...args)); mockEnd = jest.fn(); diff --git a/packages/jest-haste-map/src/index.ts b/packages/jest-haste-map/src/index.ts index 5969c12df484..f8826999e287 100644 --- a/packages/jest-haste-map/src/index.ts +++ b/packages/jest-haste-map/src/index.ts @@ -217,7 +217,7 @@ export default class HasteMap extends EventEmitter { private _console: Console; private _options: InternalOptions; private _watchers: Array; - private _worker: WorkerInterface | null; + private _worker: Promise | null; constructor(options: Options) { super(); @@ -543,14 +543,16 @@ export default class HasteMap extends EventEmitter { if (this._options.retainAllFiles && filePath.includes(NODE_MODULES)) { if (computeSha1) { return this._getWorker(workerOptions) - .getSha1({ - computeDependencies: this._options.computeDependencies, - computeSha1, - dependencyExtractor: this._options.dependencyExtractor, - filePath, - hasteImplModulePath: this._options.hasteImplModulePath, - rootDir, - }) + .then(workerInstance => + workerInstance.getSha1({ + computeDependencies: this._options.computeDependencies, + computeSha1, + dependencyExtractor: this._options.dependencyExtractor, + filePath, + hasteImplModulePath: this._options.hasteImplModulePath, + rootDir, + }), + ) .then(workerReply, workerError); } @@ -619,14 +621,16 @@ export default class HasteMap extends EventEmitter { } return this._getWorker(workerOptions) - .worker({ - computeDependencies: this._options.computeDependencies, - computeSha1, - dependencyExtractor: this._options.dependencyExtractor, - filePath, - hasteImplModulePath: this._options.hasteImplModulePath, - rootDir, - }) + .then(workerInstance => + workerInstance.worker({ + computeDependencies: this._options.computeDependencies, + computeSha1, + dependencyExtractor: this._options.dependencyExtractor, + filePath, + hasteImplModulePath: this._options.hasteImplModulePath, + rootDir, + }), + ) .then(workerReply, workerError); } @@ -711,21 +715,23 @@ export default class HasteMap extends EventEmitter { /** * Creates workers or parses files and extracts metadata in-process. */ - private _getWorker(options?: {forceInBand: boolean}): WorkerInterface { + private async _getWorker(options?: { + forceInBand: boolean; + }): Promise { if (!this._worker) { if ((options && options.forceInBand) || this._options.maxWorkers <= 1) { - this._worker = {getSha1, worker}; + this._worker = Promise.resolve({getSha1, worker}); } else { // @ts-expect-error: assignment of a worker with custom properties. - this._worker = new Worker(require.resolve('./worker'), { + this._worker = await Worker.create(require.resolve('./worker'), { exposedMethods: ['getSha1', 'worker'], maxRetries: 3, numWorkers: this._options.maxWorkers, - }) as WorkerInterface; + }); } } - return this._worker; + return this._worker!; } private _crawl(hasteMap: InternalHasteMap) { diff --git a/packages/jest-reporters/src/CoverageReporter.ts b/packages/jest-reporters/src/CoverageReporter.ts index 1883b39861e4..bd45667f03e0 100644 --- a/packages/jest-reporters/src/CoverageReporter.ts +++ b/packages/jest-reporters/src/CoverageReporter.ts @@ -153,7 +153,7 @@ export default class CoverageReporter extends BaseReporter { if (this._globalConfig.maxWorkers <= 1) { worker = require('./CoverageWorker'); } else { - worker = new Worker(require.resolve('./CoverageWorker'), { + worker = await Worker.create(require.resolve('./CoverageWorker'), { exposedMethods: ['worker'], maxRetries: 2, numWorkers: this._globalConfig.maxWorkers, diff --git a/packages/jest-runner/src/__tests__/testRunner.test.ts b/packages/jest-runner/src/__tests__/testRunner.test.ts index 16fd75e183ad..2a3583811132 100644 --- a/packages/jest-runner/src/__tests__/testRunner.test.ts +++ b/packages/jest-runner/src/__tests__/testRunner.test.ts @@ -12,7 +12,7 @@ import TestRunner from '../index'; let mockWorkerFarm; jest.mock('jest-worker', () => ({ - Worker: jest.fn( + create: jest.fn( worker => (mockWorkerFarm = { end: jest.fn().mockResolvedValue({forceExited: false}), diff --git a/packages/jest-runner/src/index.ts b/packages/jest-runner/src/index.ts index b58d881416f4..66081ed2423d 100644 --- a/packages/jest-runner/src/index.ts +++ b/packages/jest-runner/src/index.ts @@ -164,7 +164,7 @@ export default class TestRunner { } } - const worker = new Worker(TEST_WORKER_PATH, { + const worker = (await Worker.create(TEST_WORKER_PATH, { exposedMethods: ['worker'], forkOptions: {stdio: 'pipe'}, maxRetries: 3, @@ -174,7 +174,7 @@ export default class TestRunner { serializableResolvers: Array.from(resolvers.values()), }, ], - }) as WorkerInterface; + })) as WorkerInterface; if (worker.getStdout()) worker.getStdout().pipe(process.stdout); if (worker.getStderr()) worker.getStderr().pipe(process.stderr); diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md index d89d936b368e..1e1cf6f7e65d 100644 --- a/packages/jest-worker/README.md +++ b/packages/jest-worker/README.md @@ -24,7 +24,7 @@ This example covers the minimal usage: import JestWorker from 'jest-worker'; async function main() { - const worker = new JestWorker(require.resolve('./Worker')); + const worker = await JestWorker.create(require.resolve('./Worker')); const result = await worker.hello('Alice'); // "Hello, Alice" } @@ -63,6 +63,10 @@ List of method names that can be called on the child processes from the parent p Amount of workers to spawn. Defaults to the number of CPUs minus 1. +#### `workerHeartbeatTimeout: number` (optional) + +Heartbeat timeout used to ping the parent process when child workers are alive. Defaults to 10000 ms. + #### `maxRetries: number` (optional) Maximum amount of times that a dead child can be re-spawned, per call. Defaults to `3`, pass `Infinity` to allow endless retries. @@ -156,7 +160,7 @@ This example covers the standard usage: import JestWorker from 'jest-worker'; async function main() { - const myWorker = new JestWorker(require.resolve('./Worker'), { + const myWorker = await JestWorker.create(require.resolve('./Worker'), { exposedMethods: ['foo', 'bar', 'getWorkerId'], numWorkers: 4, }); @@ -200,7 +204,7 @@ This example covers the usage with a `computeWorkerKey` method: import {Worker as JestWorker} from 'jest-worker'; async function main() { - const myWorker = new JestWorker(require.resolve('./Worker'), { + const myWorker = await JestWorker.create(require.resolve('./Worker'), { computeWorkerKey: (method, filename) => filename, }); diff --git a/packages/jest-worker/src/__performance_tests__/test.js b/packages/jest-worker/src/__performance_tests__/test.js index 87bf958b670d..7bf8b5b15284 100644 --- a/packages/jest-worker/src/__performance_tests__/test.js +++ b/packages/jest-worker/src/__performance_tests__/test.js @@ -95,11 +95,14 @@ function testJestWorker() { } } - const farm = new JestWorker(require.resolve('./workers/jest_worker'), { - exposedMethods: [method], - forkOptions: {execArgv: []}, - numWorkers: threads, - }); + const farm = await JestWorker.create( + require.resolve('./workers/jest_worker'), + { + exposedMethods: [method], + forkOptions: {execArgv: []}, + workers: threads, + }, + ); farm.getStdout().pipe(process.stdout); farm.getStderr().pipe(process.stderr); diff --git a/packages/jest-worker/src/__tests__/index.test.js b/packages/jest-worker/src/__tests__/index.test.js index ec6f091289d5..5439ba4a1caf 100644 --- a/packages/jest-worker/src/__tests__/index.test.js +++ b/packages/jest-worker/src/__tests__/index.test.js @@ -7,6 +7,8 @@ 'use strict'; +import {Session} from 'inspector'; + let Farm; let WorkerPool; let Queue; @@ -170,3 +172,9 @@ it('calls getStderr and getStdout from worker', async () => { expect(farm.getStderr()('err')).toEqual('err'); expect(farm.getStdout()('out')).toEqual('out'); }); + +it('should create a worker with an inspector attached to it', async () => { + const farm = await Farm.create('/fake-worker.js'); + + expect(farm._inspectorSession instanceof Session).toBe(true); +}); diff --git a/packages/jest-worker/src/base/BaseWorkerPool.ts b/packages/jest-worker/src/base/BaseWorkerPool.ts index 4d51517b4a33..4a4a23b7113e 100644 --- a/packages/jest-worker/src/base/BaseWorkerPool.ts +++ b/packages/jest-worker/src/base/BaseWorkerPool.ts @@ -39,14 +39,23 @@ export default class BaseWorkerPool { const stdout = mergeStream(); const stderr = mergeStream(); - const {forkOptions, maxRetries, resourceLimits, setupArgs} = options; + const { + forkOptions, + inspector, + maxRetries, + setupArgs, + resourceLimits, + workerHeartbeatTimeout, + } = options; for (let i = 0; i < options.numWorkers; i++) { const workerOptions: WorkerOptions = { forkOptions, + inspector, maxRetries, resourceLimits, setupArgs, + workerHeartbeatTimeout, workerId: i, workerPath, }; diff --git a/packages/jest-worker/src/index.ts b/packages/jest-worker/src/index.ts index 50be5683dfba..9dccfc93b782 100644 --- a/packages/jest-worker/src/index.ts +++ b/packages/jest-worker/src/index.ts @@ -7,6 +7,7 @@ /* eslint-disable local/ban-types-eventually */ +import * as inspector from 'inspector'; import {cpus} from 'os'; import Farm from './Farm'; import WorkerPool from './WorkerPool'; @@ -75,18 +76,25 @@ export class Worker { private _farm: Farm; private _options: FarmOptions; private _workerPool: WorkerPoolInterface; + private _inspectorSession: inspector.Session | undefined; - constructor(workerPath: string, options?: FarmOptions) { + constructor( + workerPath: string, + inspectorSession?: inspector.Session, + options?: FarmOptions, + ) { this._options = {...options}; this._ending = false; const workerPoolOptions: WorkerPoolOptions = { enableWorkerThreads: this._options.enableWorkerThreads ?? false, forkOptions: this._options.forkOptions ?? {}, + inspector: inspectorSession, maxRetries: this._options.maxRetries ?? 3, numWorkers: this._options.numWorkers ?? Math.max(cpus().length - 1, 1), resourceLimits: this._options.resourceLimits ?? {}, setupArgs: this._options.setupArgs ?? [], + workerHeartbeatTimeout: this._options.workerHeartbeatTimeout ?? 10000, }; if (this._options.WorkerPool) { @@ -112,6 +120,38 @@ export class Worker { this._bindExposedWorkerMethods(workerPath, this._options); } + public static create = async ( + workerPath: string, + options?: FarmOptions, + ): Promise => { + const setUpInspector = async () => { + // Open V8 Inspector + inspector.open(); + + const inspectorUrl = inspector.url(); + if (inspectorUrl) { + const session = new inspector.Session(); + session.connect(); + await new Promise((resolve, reject) => { + session.post('Debugger.enable', (err: Error) => { + if (err === null) { + resolve(); + } else { + reject(err); + } + }); + }); + return session; + } + return undefined; + }; + + const inspectorSession = await setUpInspector(); + const jestWorker = new Worker(workerPath, inspectorSession, options); + + return jestWorker; + }; + private _bindExposedWorkerMethods( workerPath: string, options: FarmOptions, @@ -154,6 +194,8 @@ export class Worker { throw new Error('Farm is ended, no more calls can be done to it'); } this._ending = true; + this._inspectorSession?.disconnect(); + inspector.close(); return this._workerPool.end(); } diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index 7211d3d93993..f5c5c9322ff7 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -7,6 +7,7 @@ import type {ForkOptions} from 'child_process'; import type {EventEmitter} from 'events'; +import type {Session} from 'inspector'; // import type {ResourceLimits} from 'worker_threads'; // This is not present in the Node 12 typings @@ -28,6 +29,9 @@ export const PARENT_MESSAGE_OK: 0 = 0; export const PARENT_MESSAGE_CLIENT_ERROR: 1 = 1; export const PARENT_MESSAGE_SETUP_ERROR: 2 = 2; export const PARENT_MESSAGE_CUSTOM: 3 = 3; +export const PARENT_MESSAGE_HEARTBEAT: 4 = 4; + +export const HEARTBEAT_ERROR: 0 = 0; export type PARENT_MESSAGE_ERROR = | typeof PARENT_MESSAGE_CLIENT_ERROR @@ -102,6 +106,7 @@ export type FarmOptions = { maxRetries?: number; numWorkers?: number; taskQueue?: TaskQueue; + workerHeartbeatTimeout?: number; WorkerPool?: ( workerPath: string, options?: WorkerPoolOptions, @@ -115,6 +120,8 @@ export type WorkerPoolOptions = { resourceLimits: ResourceLimits; maxRetries: number; numWorkers: number; + inspector: Session | undefined; + workerHeartbeatTimeout: number; enableWorkerThreads: boolean; }; @@ -122,6 +129,8 @@ export type WorkerOptions = { forkOptions: ForkOptions; resourceLimits: ResourceLimits; setupArgs: Array; + inspector: Session | undefined; + workerHeartbeatTimeout: number; maxRetries: number; workerId: number; workerPath: string; @@ -175,6 +184,10 @@ export type ParentMessageOk = [ unknown, // result ]; +export type ParentMessageHeartbeat = [ + typeof PARENT_MESSAGE_HEARTBEAT, // type +]; + export type ParentMessageError = [ PARENT_MESSAGE_ERROR, // type string, // constructor @@ -186,7 +199,8 @@ export type ParentMessageError = [ export type ParentMessage = | ParentMessageOk | ParentMessageError - | ParentMessageCustom; + | ParentMessageCustom + | ParentMessageHeartbeat; // Queue types. diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index ba47862cadfe..008d1e62fcde 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -12,11 +12,13 @@ import {stdout as stdoutSupportsColor} from 'supports-color'; import { CHILD_MESSAGE_INITIALIZE, ChildMessage, + HEARTBEAT_ERROR, OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, @@ -24,6 +26,13 @@ import { WorkerOptions, } from '../types'; +interface ErrorFrame { + columnNumber: number; + lineNumber: number; + name: string; + url: string; +} + const SIGNAL_BASE_EXIT_CODE = 128; const SIGKILL_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 9; const SIGTERM_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 15; @@ -31,6 +40,8 @@ const SIGTERM_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 15; // How long to wait after SIGTERM before sending SIGKILL const SIGKILL_DELAY = 500; +const CHILD_HEARTBEAT_INTERVAL = 1_000; + /** * This class wraps the child process and provides a nice interface to * communicate with. It takes care of: @@ -65,6 +76,8 @@ export default class ChildProcessWorker implements WorkerInterface { private _exitPromise: Promise; private _resolveExitPromise!: () => void; + private _heartbeatTimeout!: NodeJS.Timeout; + constructor(options: WorkerOptions) { this._options = options; @@ -124,6 +137,7 @@ export default class ChildProcessWorker implements WorkerInterface { false, this._options.workerPath, this._options.setupArgs, + CHILD_HEARTBEAT_INTERVAL, ]); this._child = child; @@ -146,6 +160,13 @@ export default class ChildProcessWorker implements WorkerInterface { } } + monitorHeartbeat(onExceeded: () => any): void { + clearTimeout(this._heartbeatTimeout); + this._heartbeatTimeout = setTimeout(() => { + onExceeded(); + }, this._options.workerHeartbeatTimeout).unref(); + } + private _shutdown() { // End the temporary streams so the merged streams end too if (this._fakeStream) { @@ -156,6 +177,62 @@ export default class ChildProcessWorker implements WorkerInterface { this._resolveExitPromise(); } + private async monitorHeartbeatError() { + if (this._heartbeatTimeout) { + clearTimeout(this._heartbeatTimeout); + } + + if (this._options.inspector) { + const error = new Error( + `Test worker was unresponsive for ${this._options.workerHeartbeatTimeout} milliseconds. There was an inspector connected so we were able to capture stack frames before it was terminated.`, + ); + this._options.inspector.on('Debugger.paused', (message: any) => { + const frames: Array = []; + const callFrames = message.params.callFrames.slice(0, 20); + for (const callFrame of callFrames) { + const loc = callFrame.location; + + const columnNumber = loc.columnNumber; + const lineNumber = loc.lineNumber; + const url = callFrame.url; + + const name = callFrame.scopeChain[0].name; + + frames.push({ + columnNumber, + lineNumber, + name, + url, + }); + } + + // @ts-expect-error: no index + error.type = HEARTBEAT_ERROR; + error.stack = JSON.stringify(frames); + + this._onProcessEnd(error, null); + }); + + await new Promise((resolve, reject) => { + this._options.inspector?.post('Debugger.pause', (err: Error) => { + if (err === null) { + resolve(); + } else { + reject(err); + } + }); + }); + } else { + const error = new Error( + `Test worker was unresponsive for ${this._options.workerHeartbeatTimeout} milliseconds. There was no inspector connected so we were unable to capture stack frames before it was terminated.`, + ); + // @ts-expect-error: no index + error.type = HEARTBEAT_ERROR; + + this._onProcessEnd(error, null); + } + } + private _onMessage(response: ParentMessage) { // TODO: Add appropriate type check let error: any; @@ -182,7 +259,6 @@ export default class ChildProcessWorker implements WorkerInterface { error[key] = extra[key]; } } - this._onProcessEnd(error, null); break; @@ -194,10 +270,15 @@ export default class ChildProcessWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + + case PARENT_MESSAGE_HEARTBEAT: + this.monitorHeartbeat(() => this.monitorHeartbeatError()); + break; case PARENT_MESSAGE_CUSTOM: this._onCustomMessage(response[1]); break; default: + clearTimeout(this._heartbeatTimeout); throw new TypeError('Unexpected response from worker: ' + response[0]); } } @@ -216,6 +297,7 @@ export default class ChildProcessWorker implements WorkerInterface { } } else { this._shutdown(); + clearTimeout(this._heartbeatTimeout); } } @@ -250,7 +332,10 @@ export default class ChildProcessWorker implements WorkerInterface { () => this._child.kill('SIGKILL'), SIGKILL_DELAY, ); - this._exitPromise.then(() => clearTimeout(sigkillTimeout)); + this._exitPromise.then(() => { + clearTimeout(sigkillTimeout); + clearTimeout(this._heartbeatTimeout); + }); } getWorkerId(): number { diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 0d13ce958dd4..39af57da37e2 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -12,11 +12,13 @@ import mergeStream = require('merge-stream'); import { CHILD_MESSAGE_INITIALIZE, ChildMessage, + HEARTBEAT_ERROR, OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, @@ -24,6 +26,15 @@ import { WorkerOptions, } from '../types'; +interface ErrorFrame { + columnNumber: number; + lineNumber: number; + name: string; + url: string; +} + +const CHILD_HEARTBEAT_INTERVAL = 1_000; + export default class ExperimentalWorker implements WorkerInterface { private _worker!: Worker; private _options: WorkerOptions; @@ -41,6 +52,8 @@ export default class ExperimentalWorker implements WorkerInterface { private _resolveExitPromise!: () => void; private _forceExited: boolean; + private _heartbeatTimeout!: NodeJS.Timeout; + constructor(options: WorkerOptions) { this._options = options; @@ -106,6 +119,7 @@ export default class ExperimentalWorker implements WorkerInterface { false, this._options.workerPath, this._options.setupArgs, + CHILD_HEARTBEAT_INTERVAL, ]); this._retries++; @@ -126,6 +140,13 @@ export default class ExperimentalWorker implements WorkerInterface { } } + monitorHeartbeat(onExceeded: () => any): void { + clearTimeout(this._heartbeatTimeout); + this._heartbeatTimeout = setTimeout(() => { + onExceeded(); + }, this._options.workerHeartbeatTimeout).unref(); + } + private _shutdown() { // End the permanent stream so the merged stream end too if (this._fakeStream) { @@ -136,6 +157,62 @@ export default class ExperimentalWorker implements WorkerInterface { this._resolveExitPromise(); } + private async monitorHeartbeatError() { + if (this._heartbeatTimeout) { + clearTimeout(this._heartbeatTimeout); + } + + if (this._options.inspector) { + const error = new Error( + `Test worker was unresponsive for ${this._options.workerHeartbeatTimeout} milliseconds. There was an inspector connected so we were able to capture stack frames before it was terminated.`, + ); + this._options.inspector.on('Debugger.paused', (message: any) => { + const frames: Array = []; + const callFrames = message.params.callFrames.slice(0, 20); + for (const callFrame of callFrames) { + const loc = callFrame.location; + + const columnNumber = loc.columnNumber; + const lineNumber = loc.lineNumber; + const url = callFrame.url; + + const name = callFrame.scopeChain[0].name; + + frames.push({ + columnNumber, + lineNumber, + name, + url, + }); + } + + // @ts-expect-error: no index + error.type = HEARTBEAT_ERROR; + error.stack = JSON.stringify(frames); + + this._onProcessEnd(error, null); + }); + + await new Promise((resolve, reject) => { + this._options.inspector?.post('Debugger.pause', (err: Error) => { + if (err === null) { + resolve(); + } else { + reject(err); + } + }); + }); + } else { + const error = new Error( + `Test worker was unresponsive for ${this._options.workerHeartbeatTimeout} milliseconds. There was no inspector connected so we were unable to capture stack frames before it was terminated.`, + ); + // @ts-expect-error: no index + error.type = HEARTBEAT_ERROR; + + this._onProcessEnd(error, null); + } + } + private _onMessage(response: ParentMessage) { let error; @@ -145,6 +222,10 @@ export default class ExperimentalWorker implements WorkerInterface { break; case PARENT_MESSAGE_CLIENT_ERROR: + if (this._heartbeatTimeout) { + clearTimeout(this._heartbeatTimeout); + this.forceExit(); + } error = response[4]; if (error != null && typeof error === 'object') { @@ -174,10 +255,14 @@ export default class ExperimentalWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + case PARENT_MESSAGE_HEARTBEAT: + this.monitorHeartbeat(() => this.monitorHeartbeatError()); + break; case PARENT_MESSAGE_CUSTOM: this._onCustomMessage(response[1]); break; default: + clearTimeout(this._heartbeatTimeout); throw new TypeError('Unexpected response from worker: ' + response[0]); } } @@ -191,6 +276,7 @@ export default class ExperimentalWorker implements WorkerInterface { } } else { this._shutdown(); + clearTimeout(this._heartbeatTimeout); } } @@ -201,6 +287,7 @@ export default class ExperimentalWorker implements WorkerInterface { forceExit(): void { this._forceExited = true; this._worker.terminate(); + clearTimeout(this._heartbeatTimeout); } send( diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index e5e7e1e99694..b6e45f4a64a3 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -14,11 +14,15 @@ import { CHILD_MESSAGE_INITIALIZE, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, } from '../../types'; jest.useFakeTimers(); +const CHILD_HEARTBEAT_INTERVAL = 1_000; +const WORKER_HEARTBEAT_TIMEOUT = 5_000; + let Worker; let forkInterface; let childProcess; @@ -92,6 +96,7 @@ it('initializes the child process with the given workerPath', () => { forkOptions: {}, maxRetries: 3, setupArgs: ['foo', 'bar'], + workerHeartbeatTimeout: WORKER_HEARTBEAT_TIMEOUT, workerPath: '/tmp/foo/bar/baz.js', }); @@ -100,6 +105,7 @@ it('initializes the child process with the given workerPath', () => { false, '/tmp/foo/bar/baz.js', ['foo', 'bar'], + CHILD_HEARTBEAT_INTERVAL, ]); }); @@ -411,3 +417,33 @@ it('does not send SIGKILL if SIGTERM exited the process', async () => { jest.runAllTimers(); expect(forkInterface.kill.mock.calls).toEqual([['SIGTERM']]); }); + +it('calls the onProcessEnd method when we have no heartbeat message from child during n-time', () => { + jest.useFakeTimers(); + + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerHeartbeatTimeout: WORKER_HEARTBEAT_TIMEOUT, + workerPath: '/tmp/foo', + }); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + + worker.send( + [CHILD_MESSAGE_INITIALIZE, false, 'foo', [], CHILD_HEARTBEAT_INTERVAL], + onProcessStart, + onProcessEnd, + ); + + expect(onProcessEnd).not.toHaveBeenCalled(); + + forkInterface.emit('message', [PARENT_MESSAGE_HEARTBEAT]); + + jest.advanceTimersByTime(WORKER_HEARTBEAT_TIMEOUT - 1); + expect(onProcessEnd).not.toHaveBeenCalled(); + + jest.advanceTimersByTime(1); + expect(onProcessEnd).toHaveBeenCalledTimes(1); +}); diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index 5bc2fe215aa1..1df90202f1f2 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -16,6 +16,9 @@ import { PARENT_MESSAGE_OK, } from '../../types'; +const CHILD_HEARTBEAT_INTERVAL = 1_000; +const WORKER_HEARTBEAT_TIMEOUT = 5_000; + let Worker; let workerThreads; let originalExecArgv; @@ -101,6 +104,7 @@ it('initializes the thread with the given workerPath', () => { forkOptions: {}, maxRetries: 3, setupArgs: ['foo', 'bar'], + workerHeartbeatTimeout: WORKER_HEARTBEAT_TIMEOUT, workerPath: '/tmp/foo/bar/baz.js', }); @@ -109,6 +113,7 @@ it('initializes the thread with the given workerPath', () => { false, '/tmp/foo/bar/baz.js', ['foo', 'bar'], + CHILD_HEARTBEAT_INTERVAL, ]); }); diff --git a/packages/jest-worker/src/workers/__tests__/processChild.test.js b/packages/jest-worker/src/workers/__tests__/processChild.test.js index ba579e052efe..3e70cb1ee667 100644 --- a/packages/jest-worker/src/workers/__tests__/processChild.test.js +++ b/packages/jest-worker/src/workers/__tests__/processChild.test.js @@ -19,9 +19,12 @@ import { CHILD_MESSAGE_END, CHILD_MESSAGE_INITIALIZE, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, } from '../../types'; +const CHILD_HEARTBEAT_INTERVAL = 1_000; + let ended; let mockCount; let initializeParm = uninitializedParam; @@ -376,3 +379,25 @@ it('throws if child is not forked', () => { ]); }).toThrow(); }); + +it('should emit an heartbeat message every time interval', () => { + jest.useFakeTimers(); + + const HEARTBEATS_NUM = 3; + + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + false, // Not really used here, but for flow type purity. + './my-fancy-worker', + ['foo'], // Pass empty initialize params so the initialize method is called. + CHILD_HEARTBEAT_INTERVAL, + ]); + + jest.advanceTimersByTime(CHILD_HEARTBEAT_INTERVAL * HEARTBEATS_NUM); + + expect(process.send).toHaveBeenCalledTimes(HEARTBEATS_NUM); + + for (let i = 0; i < HEARTBEATS_NUM; i++) { + expect(process.send.mock.calls[i][0]).toEqual([PARENT_MESSAGE_HEARTBEAT]); + } +}); diff --git a/packages/jest-worker/src/workers/processChild.ts b/packages/jest-worker/src/workers/processChild.ts index 64d29e19e132..d64f8d03cc3c 100644 --- a/packages/jest-worker/src/workers/processChild.ts +++ b/packages/jest-worker/src/workers/processChild.ts @@ -13,13 +13,18 @@ import { ChildMessageInitialize, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, } from '../types'; +const DEFAULT_HEARTBEAT_INTERVAL_VALUE = 1_000; + let file: string | null = null; let setupArgs: Array = []; let initialized = false; +let monitorHeartbeat: NodeJS.Timeout; +let heartbeatIntervalValue: number; /** * This file is a small bootstrapper for workers. It sets up the communication @@ -40,6 +45,10 @@ const messageListener: NodeJS.MessageListener = request => { const init: ChildMessageInitialize = request; file = init[2]; setupArgs = request[3]; + heartbeatIntervalValue = request[4] || DEFAULT_HEARTBEAT_INTERVAL_VALUE; + monitorHeartbeat = setInterval(() => { + sendParentMessageHeartbeat(); + }, heartbeatIntervalValue).unref(); break; case CHILD_MESSAGE_CALL: @@ -59,6 +68,12 @@ const messageListener: NodeJS.MessageListener = request => { }; process.on('message', messageListener); +function sendParentMessageHeartbeat() { + if (process?.send) { + process.send([PARENT_MESSAGE_HEARTBEAT]); + } +} + function reportSuccess(result: unknown) { if (!process || !process.send) { throw new Error('Child can only be used on a forked process'); @@ -76,6 +91,7 @@ function reportInitializeError(error: Error) { } function reportError(error: Error, type: PARENT_MESSAGE_ERROR) { + clearInterval(monitorHeartbeat); if (!process || !process.send) { throw new Error('Child can only be used on a forked process'); } @@ -108,6 +124,7 @@ function end(): void { function exitProcess(): void { // Clean up open handles so the process ideally exits gracefully process.removeListener('message', messageListener); + clearInterval(monitorHeartbeat); } function execMethod(method: string, args: Array): void { diff --git a/packages/jest-worker/src/workers/threadChild.ts b/packages/jest-worker/src/workers/threadChild.ts index 6783ec843510..5077b94a337c 100644 --- a/packages/jest-worker/src/workers/threadChild.ts +++ b/packages/jest-worker/src/workers/threadChild.ts @@ -14,13 +14,18 @@ import { ChildMessageInitialize, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_HEARTBEAT, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, } from '../types'; +const DEFAULT_HEARTBEAT_INTERVAL_VALUE = 1_000; + let file: string | null = null; let setupArgs: Array = []; let initialized = false; +let monitorHeartbeat: NodeJS.Timeout; +let heartbeatIntervalValue: number; /** * This file is a small bootstrapper for workers. It sets up the communication @@ -41,6 +46,10 @@ const messageListener = (request: any) => { const init: ChildMessageInitialize = request; file = init[2]; setupArgs = request[3]; + heartbeatIntervalValue = request[4] || DEFAULT_HEARTBEAT_INTERVAL_VALUE; + monitorHeartbeat = setInterval(() => { + sendParentMessageHeartbeat(); + }, heartbeatIntervalValue).unref(); break; case CHILD_MESSAGE_CALL: @@ -68,6 +77,12 @@ function reportSuccess(result: unknown) { parentPort!.postMessage([PARENT_MESSAGE_OK, result]); } +function sendParentMessageHeartbeat() { + if (process?.send) { + process.send([PARENT_MESSAGE_HEARTBEAT]); + } +} + function reportClientError(error: Error) { return reportError(error, PARENT_MESSAGE_CLIENT_ERROR); } @@ -109,6 +124,7 @@ function end(): void { function exitProcess(): void { // Clean up open handles so the worker ideally exits gracefully parentPort!.removeListener('message', messageListener); + clearInterval(monitorHeartbeat); } function execMethod(method: string, args: Array): void {