Skip to content

Commit 9c8baa3

Browse files
atlowChemiCeres6
authored andcommitted
stream: use addAbortListener
PR-URL: nodejs#48550 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 961d209 commit 9c8baa3

5 files changed

Lines changed: 39 additions & 26 deletions

File tree

lib/internal/streams/add-abort-signal.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
'use strict';
22

3+
const {
4+
SymbolDispose,
5+
} = primordials;
6+
37
const {
48
AbortError,
59
codes,
@@ -13,6 +17,7 @@ const {
1317

1418
const eos = require('internal/streams/end-of-stream');
1519
const { ERR_INVALID_ARG_TYPE } = codes;
20+
let addAbortListener;
1621

1722
// This method is inlined here for readable-stream
1823
// It also does not allow for signal to not exist on the stream
@@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
4651
if (signal.aborted) {
4752
onAbort();
4853
} else {
49-
signal.addEventListener('abort', onAbort);
50-
eos(stream, () => signal.removeEventListener('abort', onAbort));
54+
addAbortListener ??= require('events').addAbortListener;
55+
const disposable = addAbortListener(signal, onAbort);
56+
eos(stream, disposable[SymbolDispose]);
5157
}
5258
return stream;
5359
};

lib/internal/streams/end-of-stream.js

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ const {
2222
validateBoolean,
2323
} = require('internal/validators');
2424

25-
const { Promise, PromisePrototypeThen } = primordials;
25+
const {
26+
Promise,
27+
PromisePrototypeThen,
28+
SymbolDispose,
29+
} = primordials;
2630

2731
const {
2832
isClosed,
@@ -40,6 +44,7 @@ const {
4044
willEmitClose: _willEmitClose,
4145
kIsClosedPromise,
4246
} = require('internal/streams/utils');
47+
let addAbortListener;
4348

4449
function isRequest(stream) {
4550
return stream.setHeader && typeof stream.abort === 'function';
@@ -249,12 +254,13 @@ function eos(stream, options, callback) {
249254
if (options.signal.aborted) {
250255
process.nextTick(abort);
251256
} else {
257+
addAbortListener ??= require('events').addAbortListener;
258+
const disposable = addAbortListener(options.signal, abort);
252259
const originalCallback = callback;
253260
callback = once((...args) => {
254-
options.signal.removeEventListener('abort', abort);
261+
disposable[SymbolDispose]();
255262
originalCallback.apply(stream, args);
256263
});
257-
options.signal.addEventListener('abort', abort);
258264
}
259265
}
260266

@@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) {
272278
if (options.signal.aborted) {
273279
process.nextTick(abort);
274280
} else {
281+
addAbortListener ??= require('events').addAbortListener;
282+
const disposable = addAbortListener(options.signal, abort);
275283
const originalCallback = callback;
276284
callback = once((...args) => {
277-
options.signal.removeEventListener('abort', abort);
285+
disposable[SymbolDispose]();
278286
originalCallback.apply(stream, args);
279287
});
280-
options.signal.addEventListener('abort', abort);
281288
}
282289
}
283290
const resolverFn = (...args) => {

lib/internal/streams/operators.js

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const { AbortController } = require('internal/abort_controller');
3+
const { AbortController, AbortSignal } = require('internal/abort_controller');
44

55
const {
66
codes: {
@@ -16,7 +16,7 @@ const {
1616
validateInteger,
1717
validateObject,
1818
} = require('internal/validators');
19-
const { kWeakHandler } = require('internal/event_target');
19+
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
2020
const { finished } = require('internal/streams/end-of-stream');
2121
const staticCompose = require('internal/streams/compose');
2222
const {
@@ -26,6 +26,7 @@ const { isWritable, isNodeStream } = require('internal/streams/utils');
2626

2727
const {
2828
ArrayPrototypePush,
29+
Boolean,
2930
MathFloor,
3031
Number,
3132
NumberIsNaN,
@@ -83,19 +84,11 @@ function map(fn, options) {
8384
validateInteger(concurrency, 'concurrency', 1);
8485

8586
return async function* map() {
86-
const ac = new AbortController();
87+
const signal = AbortSignal.any([options?.signal].filter(Boolean));
8788
const stream = this;
8889
const queue = [];
89-
const signal = ac.signal;
9090
const signalOpt = { signal };
9191

92-
const abort = () => ac.abort();
93-
if (options?.signal?.aborted) {
94-
abort();
95-
}
96-
97-
options?.signal?.addEventListener('abort', abort);
98-
9992
let next;
10093
let resume;
10194
let done = false;
@@ -152,7 +145,6 @@ function map(fn, options) {
152145
next();
153146
next = null;
154147
}
155-
options?.signal?.removeEventListener('abort', abort);
156148
}
157149
}
158150

@@ -187,8 +179,6 @@ function map(fn, options) {
187179
});
188180
}
189181
} finally {
190-
ac.abort();
191-
192182
done = true;
193183
if (resume) {
194184
resume();
@@ -281,7 +271,7 @@ async function reduce(reducer, initialValue, options) {
281271
const ac = new AbortController();
282272
const signal = ac.signal;
283273
if (options?.signal) {
284-
const opts = { once: true, [kWeakHandler]: this };
274+
const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true };
285275
options.signal.addEventListener('abort', () => ac.abort(), opts);
286276
}
287277
let gotAnyItemFromStream = false;

lib/internal/streams/pipeline.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
ArrayIsArray,
88
Promise,
99
SymbolAsyncIterator,
10+
SymbolDispose,
1011
} = primordials;
1112

1213
const eos = require('internal/streams/end-of-stream');
@@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller');
4445

4546
let PassThrough;
4647
let Readable;
48+
let addAbortListener;
4749

4850
function destroyer(stream, reading, writing) {
4951
let finished = false;
@@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) {
206208
finishImpl(new AbortError());
207209
}
208210

209-
outerSignal?.addEventListener('abort', abort);
211+
addAbortListener ??= require('events').addAbortListener;
212+
let disposable;
213+
if (outerSignal) {
214+
disposable = addAbortListener(outerSignal, abort);
215+
}
210216

211217
let error;
212218
let value;
@@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) {
231237
destroys.shift()(error);
232238
}
233239

234-
outerSignal?.removeEventListener('abort', abort);
240+
disposable?.[SymbolDispose]();
235241
ac.abort();
236242

237243
if (final) {

lib/internal/webstreams/readablestream.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SafePromiseAll,
2323
Symbol,
2424
SymbolAsyncIterator,
25+
SymbolDispose,
2526
SymbolToStringTag,
2627
Uint8Array,
2728
} = primordials;
@@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease');
140141

141142
let releasedError;
142143
let releasingError;
144+
let addAbortListener;
143145

144146
const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;
145147

@@ -1258,6 +1260,7 @@ function readableStreamPipeTo(
12581260

12591261
let reader;
12601262
let writer;
1263+
let disposable;
12611264
// Both of these can throw synchronously. We want to capture
12621265
// the error and return a rejected promise instead.
12631266
try {
@@ -1290,7 +1293,7 @@ function readableStreamPipeTo(
12901293
writableStreamDefaultWriterRelease(writer);
12911294
readableStreamReaderGenericRelease(reader);
12921295
if (signal !== undefined)
1293-
signal.removeEventListener('abort', abortAlgorithm);
1296+
disposable?.[SymbolDispose]();
12941297
if (rejected)
12951298
promise.reject(error);
12961299
else
@@ -1417,7 +1420,8 @@ function readableStreamPipeTo(
14171420
abortAlgorithm();
14181421
return promise.promise;
14191422
}
1420-
signal.addEventListener('abort', abortAlgorithm, { once: true });
1423+
addAbortListener ??= require('events').addAbortListener;
1424+
disposable = addAbortListener(signal, abortAlgorithm);
14211425
}
14221426

14231427
setPromiseHandled(run());

0 commit comments

Comments
 (0)