@@ -35,6 +35,7 @@ const {
3535 SymbolAsyncIterator,
3636 Symbol,
3737 TypeError,
38+ Uint8Array,
3839} = primordials ;
3940
4041module . exports = Readable ;
@@ -45,6 +46,8 @@ const { Stream, prependListener } = require('internal/streams/legacy');
4546const { Buffer } = require ( 'buffer' ) ;
4647
4748let Blob ;
49+ let ReadableStream ;
50+ let CountQueuingStrategy ;
4851
4952const {
5053 addAbortSignal,
@@ -75,9 +78,11 @@ const { validateObject } = require('internal/validators');
7578
7679const kPaused = Symbol ( 'kPaused' ) ;
7780const kConsume = Symbol ( 'kConsume' ) ;
81+ const kReading = Symbol ( 'kReading' ) ;
7882
7983const { StringDecoder } = require ( 'string_decoder' ) ;
8084const from = require ( 'internal/streams/from' ) ;
85+ const assert = require ( 'internal/assert' ) ;
8186
8287ObjectSetPrototypeOf ( Readable . prototype , Stream . prototype ) ;
8388ObjectSetPrototypeOf ( Readable , Stream ) ;
@@ -213,6 +218,7 @@ function Readable(options) {
213218 }
214219
215220 this [ kConsume ] = null ;
221+ this [ kReading ] = false ; // Is stream being consumed through Readable API?
216222
217223 Stream . call ( this , options ) ;
218224
@@ -238,6 +244,11 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
238244// similar to how Writable.write() returns true if you should
239245// write() some more.
240246Readable . prototype . push = function ( chunk , encoding ) {
247+ if ( this [ kConsume ] && chunk !== null && ! this [ kReading ] ) {
248+ encoding = encoding || this . _readableState . defaultEncoding ;
249+ return this [ kConsume ] . push ( chunk , encoding ) ;
250+ }
251+
241252 return readableAddChunk ( this , chunk , encoding , false ) ;
242253} ;
243254
@@ -307,10 +318,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
307318 maybeReadMore ( stream , state ) ;
308319 }
309320
321+ const consumed = this [ kConsume ] ? this [ kConsume ] . push ( chunk , encoding ) : true ;
322+
310323 // We can push more data if we are below the highWaterMark.
311324 // Also, if we have no data yet, we can stand some more bytes.
312325 // This is to work around cases where hwm=0, such as the repl.
313- return ! state . ended &&
326+ return consumed && ! state . ended &&
314327 ( state . length < state . highWaterMark || state . length === 0 ) ;
315328}
316329
@@ -402,6 +415,27 @@ function howMuchToRead(n, state) {
402415 return state . ended ? state . length : 0 ;
403416}
404417
418+
419+ function _read ( self , n ) {
420+ // Call internal read method
421+ try {
422+ const result = self . _read ( n ) ;
423+ if ( result != null ) {
424+ const then = result . then ;
425+ if ( typeof then === 'function' ) {
426+ then . call (
427+ result ,
428+ nop ,
429+ function ( err ) {
430+ errorOrDestroy ( self , err ) ;
431+ } ) ;
432+ }
433+ }
434+ } catch ( err ) {
435+ errorOrDestroy ( self , err ) ;
436+ }
437+ }
438+
405439// You can override either this method, or the async _read(n) below.
406440Readable . prototype . read = function ( n ) {
407441 debug ( 'read' , n ) ;
@@ -496,22 +530,7 @@ Readable.prototype.read = function(n) {
496530 state . needReadable = true ;
497531
498532 // Call internal read method
499- try {
500- const result = this . _read ( state . highWaterMark ) ;
501- if ( result != null ) {
502- const then = result . then ;
503- if ( typeof then === 'function' ) {
504- then . call (
505- result ,
506- nop ,
507- function ( err ) {
508- errorOrDestroy ( this , err ) ;
509- } ) ;
510- }
511- }
512- } catch ( err ) {
513- errorOrDestroy ( this , err ) ;
514- }
533+ _read ( this , state . highWaterMark ) ;
515534
516535 state . sync = false ;
517536 // If _read pushed data synchronously, then `reading` will be false,
@@ -906,6 +925,8 @@ Readable.prototype.on = function(ev, fn) {
906925 const state = this . _readableState ;
907926
908927 if ( ev === 'data' ) {
928+ this [ kReading ] = true ;
929+
909930 // Update readableListening so that resume() may be a no-op
910931 // a few lines down. This is needed to support once('readable').
911932 state . readableListening = this . listenerCount ( 'readable' ) > 0 ;
@@ -914,6 +935,8 @@ Readable.prototype.on = function(ev, fn) {
914935 if ( state . flowing !== false )
915936 this . resume ( ) ;
916937 } else if ( ev === 'readable' ) {
938+ this [ kReading ] = true ;
939+
917940 if ( ! state . endEmitted && ! state . readableListening ) {
918941 state . readableListening = state . needReadable = true ;
919942 state . flowing = false ;
@@ -1310,7 +1333,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13101333 body : {
13111334 get ( ) {
13121335 if ( this [ kConsume ] ?. type === kWebStreamType ) {
1313- return this [ kConsume ] . body ;
1336+ return this [ kConsume ] . stream ;
13141337 }
13151338
13161339 return consume ( this , kWebStreamType ) ;
@@ -1343,8 +1366,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13431366} ) ;
13441367
13451368function isLocked ( self ) {
1346- return self [ kConsume ] &&
1347- ( self [ kConsume ] . type !== kWebStreamType || self [ kConsume ] . body . locked ) ;
1369+ return self [ kConsume ] ?. stream ?. locked === true ;
13481370}
13491371
13501372// https://streams.spec.whatwg.org/#readablestream-disturbed
@@ -1363,56 +1385,193 @@ function consume(self, type) {
13631385 }
13641386
13651387 if ( type === kWebStreamType ) {
1366- self [ kConsume ] = {
1388+ if ( ! ReadableStream ) {
1389+ ReadableStream = require ( 'internal/webstreams/readablestream' )
1390+ . ReadableStream ;
1391+ }
1392+
1393+ const objectMode = self . readableObjectMode ;
1394+ const highWaterMark = self . readableHighWaterMark ;
1395+ // When not running in objectMode explicitly, we just fall
1396+ // back to a minimal strategy that just specifies the highWaterMark
1397+ // and no size algorithm. Using a ByteLengthQueuingStrategy here
1398+ // is unnecessary.
1399+ let strategy ;
1400+ if ( objectMode ) {
1401+ if ( ! CountQueuingStrategy ) {
1402+ CountQueuingStrategy = require ( 'internal/webstreams/queuingstrategies' ) ;
1403+ }
1404+ strategy = new CountQueuingStrategy ( { highWaterMark } ) ;
1405+ } else {
1406+ strategy = { highWaterMark } ;
1407+ }
1408+
1409+ self
1410+ . on ( 'error' , function ( err ) {
1411+ const { controller } = this [ kConsume ] ;
1412+ controller . error ( err ) ;
1413+ } )
1414+ . on ( 'close' , function ( ) {
1415+ const { controller } = this [ kConsume ] ;
1416+ if ( controller ) {
1417+ controller . error ( new AbortError ( ) ) ;
1418+ }
1419+ } ) ;
1420+
1421+ const consume = self [ kConsume ] = {
13671422 type,
1368- body : Readable . toWeb ( self )
1423+ objectMode,
1424+ controller : null ,
1425+ push ( chunk ) {
1426+ const { objectMode, controller } = this ;
1427+
1428+ assert ( controller ) ;
1429+
1430+ if ( chunk === null ) {
1431+ controller . close ( ) ;
1432+ this . controller = null ;
1433+ } else {
1434+ if ( ! objectMode ) {
1435+ if ( typeof chunk === 'string' ) {
1436+ chunk = new Uint8Array ( Buffer . from ( chunk ) ) ;
1437+ } else if ( Buffer . isBuffer ( chunk ) ) {
1438+ // Copy the Buffer to detach it from the pool.
1439+ chunk = new Uint8Array ( chunk ) ;
1440+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1441+ // Do nothing...
1442+ } else if ( chunk != null ) {
1443+ throw new ERR_INVALID_ARG_TYPE (
1444+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
1445+ }
1446+ }
1447+
1448+ // TODO: Does controller perform any type checks?
1449+ controller . enqueue ( chunk ) ;
1450+ }
1451+
1452+ return controller . desiredSize > 0 ;
1453+ } ,
1454+ stream : new ReadableStream ( {
1455+ async start ( controller ) {
1456+ consume . controller = controller ;
1457+
1458+ const { _readableState : state } = self ;
1459+
1460+ if ( self [ kReading ] ) {
1461+ while ( controller . desiredSize > 0 ) {
1462+ const chunk = self . read ( ) ;
1463+ if ( chunk === null ) {
1464+ break ;
1465+ }
1466+ controller . enqueue ( chunk ) ;
1467+ }
1468+ } else {
1469+ const buffer = state . buffer ;
1470+ while ( buffer . length ) {
1471+ controller . enqueue ( buffer . shift ( ) ) ;
1472+ }
1473+ state . lenth = 0 ;
1474+ }
1475+
1476+ if ( state . ended ) {
1477+ controller . close ( ) ;
1478+ }
1479+
1480+ if ( ! state . constructed ) {
1481+ await EE . once ( destroyImpl . kConstruct , self ) ;
1482+ }
1483+ } ,
1484+ pull ( ) {
1485+ const { _readableState : state } = self ;
1486+
1487+ const n = consume . controller . desiredSize ;
1488+
1489+ if ( self [ kReading ] ) {
1490+ assert ( state . length === 0 ) ;
1491+ self . read ( n ) ;
1492+ } else {
1493+ _read ( self , n ) ;
1494+ }
1495+ } ,
1496+ cancel ( reason ) {
1497+ self . destroy ( reason ) ;
1498+ } ,
1499+ } , strategy )
13691500 } ;
13701501
1371- return self [ kConsume ] . body ;
1502+ return consume . stream ;
13721503 }
13731504
13741505 return new Promise ( ( resolve , reject ) => {
13751506 self [ kConsume ] = {
13761507 type,
13771508 resolve,
13781509 reject,
1379- body : type === kTextType || type === kJSONType ? '' : [ ]
1380- } ;
1381- self
1382- . on ( 'error' , reject )
1383- . on ( 'data' , function ( val ) {
1384- const { type } = this [ kConsume ] ;
1510+ decoder : null ,
1511+ body : type === kTextType || type === kJSONType ? '' : [ ] ,
1512+ push ( chunk , encoding ) {
1513+ const { type, body, resolve, decoder } = this [ kConsume ] ;
1514+
1515+ if ( chunk === null ) {
1516+ try {
1517+ if ( type === kTextType ) {
1518+ resolve ( body + ( decoder ? decoder . end ( ) : '' ) ) ;
1519+ } else if ( type === kJSONType ) {
1520+ resolve ( JSONParse ( body + ( decoder ? decoder . end ( ) : '' ) ) ) ;
1521+ } else if ( type === kArrayBufferType ) {
1522+ resolve ( Buffer . concat ( body ) . buffer ) ;
1523+ } else if ( type === kBlobType ) {
1524+ if ( ! Blob ) {
1525+ Blob = require ( 'buffer' ) . Blob ;
1526+ }
1527+ resolve ( new Blob ( body ) ) ;
1528+ }
13851529
1386- // TODO (fix): Do we need type check and/or conversion?
1530+ this [ kConsume ] . body = null ;
1531+ } catch ( err ) {
1532+ self . destroy ( err ) ;
1533+ }
1534+ } else if ( type === kTextType || type === kJSONType ) {
1535+ if ( typeof chunk === 'string' ) {
1536+ if ( decoder ) {
1537+ chunk = decoder . write ( Buffer . from ( chunk ) ) ;
1538+ }
1539+ // TODO: Encoding check/transform?
1540+ } else if ( chunk instanceof Buffer ) {
1541+ if ( ! decoder ) {
1542+ this [ kConsume ] . decoder = new StringDecoder ( 'utf8' ) ;
1543+ }
1544+ encoding = decoder . write ( chunk ) ;
1545+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1546+ encoding = decoder . write ( Stream . _uint8ArrayToBuffer ( chunk ) ) ;
1547+ } else {
1548+ throw new ERR_INVALID_ARG_TYPE (
1549+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
1550+ }
13871551
1388- if ( type === kTextType || type === kJSONType ) {
1389- this [ kConsume ] . body += val ;
1552+ this [ kConsume ] . body += chunk ;
13901553 } else {
1391- this [ kConsume ] . body . push ( val ) ;
1392- }
1393- } )
1394- . on ( 'end' , function ( ) {
1395- const { type, resolve, body } = this [ kConsume ] ;
1396-
1397- try {
1398- if ( type === kTextType ) {
1399- resolve ( body ) ;
1400- } else if ( type === kJSONType ) {
1401- resolve ( JSONParse ( body ) ) ;
1402- } else if ( type === kArrayBufferType ) {
1403- resolve ( Buffer . concat ( body ) . buffer ) ;
1404- } else if ( type === kBlobType ) {
1405- if ( ! Blob ) {
1406- Blob = require ( 'buffer' ) . Blob ;
1407- }
1408- resolve ( new Blob ( body ) ) ;
1554+ if ( typeof chunk === 'string' ) {
1555+ chunk = Buffer . from ( chunk ) ;
1556+ // TODO: Encoding check/transform?
1557+ } else if ( chunk instanceof Buffer ) {
1558+ // Do nothing...
1559+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1560+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
1561+ } else {
1562+ throw new ERR_INVALID_ARG_TYPE (
1563+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
14091564 }
14101565
1411- this [ kConsume ] . body = null ;
1412- } catch ( err ) {
1413- self . destroy ( err ) ;
1566+ this [ kConsume ] . body . push ( chunk ) ;
14141567 }
1415- } )
1568+
1569+ return true ;
1570+ }
1571+ } ;
1572+
1573+ self
1574+ . on ( 'error' , reject )
14161575 . on ( 'close' , function ( ) {
14171576 const { body, reject } = this [ kConsume ] ;
14181577
@@ -1522,5 +1681,6 @@ Readable.fromWeb = function(readableStream, options) {
15221681} ;
15231682
15241683Readable . toWeb = function ( streamReadable ) {
1525- return lazyWebStreams ( ) . newReadableStreamFromStreamReadable ( streamReadable ) ;
1684+ return streamReadable [ kConsume ] !== undefined ? streamReadable . stream :
1685+ lazyWebStreams ( ) . newReadableStreamFromStreamReadable ( streamReadable ) ;
15261686} ;
0 commit comments