Skip to content

Commit 44744d9

Browse files
committed
stream: compose with async functions
Enables async function support for stream.compose. PR-URL: nodejs#39435 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent d114c0a commit 44744d9

File tree

2 files changed

+54
-27
lines changed

2 files changed

+54
-27
lines changed

doc/api/stream.md

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,7 +1862,7 @@ failure, this can cause event listener leaks and swallowed errors.
18621862
added: REPLACEME
18631863
-->
18641864

1865-
* `streams` {Stream[]}
1865+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
18661866
* Returns: {stream.Duplex}
18671867

18681868
Combines two or more streams into a `Duplex` stream that writes to the
@@ -1876,6 +1876,9 @@ when passing streams to `stream.pipeline`, typically the first stream is
18761876
a readable stream and the last a writable stream, forming a closed
18771877
circuit.
18781878

1879+
If passed a `Function` it must be a factory method taking a `source`
1880+
`Iterable`.
1881+
18791882
```mjs
18801883
import { compose, Transform } from 'stream';
18811884

@@ -1885,11 +1888,11 @@ const removeSpaces = new Transform({
18851888
}
18861889
});
18871890

1888-
const toUpper = new Transform({
1889-
transform(chunk, encoding, callback) {
1890-
callback(null, String(chunk).toUpperCase());
1891+
async function* toUpper(source) {
1892+
for await (const chunk of source) {
1893+
yield String(chunk).toUpperCase();
18911894
}
1892-
});
1895+
}
18931896

18941897
let res = '';
18951898
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
@@ -1899,6 +1902,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
18991902
console.log(res); // prints 'HELLOWORLD'
19001903
```
19011904

1905+
`stream.compose` can be used to convert async iterables, generators and
1906+
functions into streams.
1907+
1908+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
1909+
`null`.
1910+
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
1911+
Must take a source `AsyncIterable` as first parameter. Cannot yield
1912+
`null`.
1913+
* `AsyncFunction` converts into a writable `Duplex`. Must return
1914+
either `null` or `undefined`.
1915+
1916+
```mjs
1917+
import { compose } from 'stream';
1918+
import { finished } from 'stream/promises';
1919+
1920+
// Convert AsyncIterable into readable Duplex.
1921+
const s1 = compose(async function*() {
1922+
yield 'Hello';
1923+
yield 'World';
1924+
}());
1925+
1926+
// Convert AsyncGenerator into transform Duplex.
1927+
const s2 = compose(async function*(source) {
1928+
for await (const chunk of source) {
1929+
yield String(chunk).toUpperCase();
1930+
}
1931+
});
1932+
1933+
let res = '';
1934+
1935+
// Convert AsyncFunction into writable Duplex.
1936+
const s3 = compose(async function(source) {
1937+
for await (const chunk of source) {
1938+
res += chunk;
1939+
}
1940+
});
1941+
1942+
await finished(compose(s1, s2, s3));
1943+
1944+
console.log(res); // prints 'HELLOWORLD'
1945+
```
1946+
19021947
### `stream.Readable.from(iterable, [options])`
19031948
<!-- YAML
19041949
added:

lib/stream.js

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,10 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33-
const _compose = require('internal/streams/compose');
33+
const compose = require('internal/streams/compose');
34+
const { destroyer } = require('internal/streams/destroy');
3435
const eos = require('internal/streams/end-of-stream');
3536
const internalBuffer = require('internal/buffer');
36-
const { isNodeStream } = require('internal/streams/utils');
37-
const {
38-
codes: {
39-
ERR_INVALID_ARG_VALUE,
40-
},
41-
} = require('internal/errors');
4237

4338
const promises = require('stream/promises');
4439

@@ -52,21 +47,8 @@ Stream.pipeline = pipeline;
5247
const { addAbortSignal } = require('internal/streams/add-abort-signal');
5348
Stream.addAbortSignal = addAbortSignal;
5449
Stream.finished = eos;
55-
56-
Stream.compose = function compose(...streams) {
57-
// TODO (ronag): Remove this once async function API
58-
// has been discussed.
59-
for (let n = 0; n < streams.length; ++n) {
60-
if (!isNodeStream(streams[n])) {
61-
throw new ERR_INVALID_ARG_VALUE(
62-
`streams[${n}]`,
63-
streams[n],
64-
'must be stream'
65-
);
66-
}
67-
}
68-
return _compose(...streams);
69-
};
50+
Stream.destroy = destroyer;
51+
Stream.compose = compose;
7052

7153
ObjectDefineProperty(Stream, 'promises', {
7254
configurable: true,

0 commit comments

Comments
 (0)