Skip to content

Commit 81187c2

Browse files
committed
stream: compose with async functions
Enables async function support for stream.compose. PR-URL: nodejs#39435 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 212f47a commit 81187c2

File tree

2 files changed

+54
-27
lines changed

2 files changed

+54
-27
lines changed

doc/api/stream.md

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,7 +1923,7 @@ failure, this can cause event listener leaks and swallowed errors.
19231923
added: REPLACEME
19241924
-->
19251925

1926-
* `streams` {Stream[]}
1926+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
19271927
* Returns: {stream.Duplex}
19281928

19291929
Combines two or more streams into a `Duplex` stream that writes to the
@@ -1937,6 +1937,9 @@ when passing streams to `stream.pipeline`, typically the first stream is
19371937
a readable stream and the last a writable stream, forming a closed
19381938
circuit.
19391939

1940+
If passed a `Function` it must be a factory method taking a `source`
1941+
`Iterable`.
1942+
19401943
```mjs
19411944
import { compose, Transform } from 'stream';
19421945

@@ -1946,11 +1949,11 @@ const removeSpaces = new Transform({
19461949
}
19471950
});
19481951

1949-
const toUpper = new Transform({
1950-
transform(chunk, encoding, callback) {
1951-
callback(null, String(chunk).toUpperCase());
1952+
async function* toUpper(source) {
1953+
for await (const chunk of source) {
1954+
yield String(chunk).toUpperCase();
19521955
}
1953-
});
1956+
}
19541957

19551958
let res = '';
19561959
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
@@ -1960,6 +1963,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
19601963
console.log(res); // prints 'HELLOWORLD'
19611964
```
19621965

1966+
`stream.compose` can be used to convert async iterables, generators and
1967+
functions into streams.
1968+
1969+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
1970+
`null`.
1971+
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
1972+
Must take a source `AsyncIterable` as first parameter. Cannot yield
1973+
`null`.
1974+
* `AsyncFunction` converts into a writable `Duplex`. Must return
1975+
either `null` or `undefined`.
1976+
1977+
```mjs
1978+
import { compose } from 'stream';
1979+
import { finished } from 'stream/promises';
1980+
1981+
// Convert AsyncIterable into readable Duplex.
1982+
const s1 = compose(async function*() {
1983+
yield 'Hello';
1984+
yield 'World';
1985+
}());
1986+
1987+
// Convert AsyncGenerator into transform Duplex.
1988+
const s2 = compose(async function*(source) {
1989+
for await (const chunk of source) {
1990+
yield String(chunk).toUpperCase();
1991+
}
1992+
});
1993+
1994+
let res = '';
1995+
1996+
// Convert AsyncFunction into writable Duplex.
1997+
const s3 = compose(async function(source) {
1998+
for await (const chunk of source) {
1999+
res += chunk;
2000+
}
2001+
});
2002+
2003+
await finished(compose(s1, s2, s3));
2004+
2005+
console.log(res); // prints 'HELLOWORLD'
2006+
```
2007+
19632008
### `stream.Readable.from(iterable, [options])`
19642009
<!-- YAML
19652010
added:

lib/stream.js

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,10 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33-
const _compose = require('internal/streams/compose');
33+
const compose = require('internal/streams/compose');
34+
const { destroyer } = require('internal/streams/destroy');
3435
const eos = require('internal/streams/end-of-stream');
3536
const internalBuffer = require('internal/buffer');
36-
const { isNodeStream } = require('internal/streams/utils');
37-
const {
38-
codes: {
39-
ERR_INVALID_ARG_VALUE,
40-
},
41-
} = require('internal/errors');
4237

4338
const promises = require('stream/promises');
4439

@@ -53,21 +48,8 @@ Stream.pipeline = pipeline;
5348
const { addAbortSignal } = require('internal/streams/add-abort-signal');
5449
Stream.addAbortSignal = addAbortSignal;
5550
Stream.finished = eos;
56-
57-
Stream.compose = function compose(...streams) {
58-
// TODO (ronag): Remove this once async function API
59-
// has been discussed.
60-
for (let n = 0; n < streams.length; ++n) {
61-
if (!isNodeStream(streams[n])) {
62-
throw new ERR_INVALID_ARG_VALUE(
63-
`streams[${n}]`,
64-
streams[n],
65-
'must be stream'
66-
);
67-
}
68-
}
69-
return _compose(...streams);
70-
};
51+
Stream.destroy = destroyer;
52+
Stream.compose = compose;
7153

7254
ObjectDefineProperty(Stream, 'promises', {
7355
configurable: true,

0 commit comments

Comments
 (0)