Skip to content

node 可写流 #16

@Alexis374

Description

@Alexis374
Writable Streams

Writable streams are an abstraction for a destination to which data is written.

可写流是对数据被写入(的地方)的抽象

Examples of Writable streams include:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

可写流的实例有如上几个。上面列出的可写流有些是Duplex流,实现了Writable接口。

All Writable streams implement the interface defined by the stream.Writable class.

所有的可写流都实现了 stream.Writable 类定义的接口

While specific instances of Writable streams may differ in various ways, all Writable streams follow the same fundamental usage pattern as illustrated in the example below:

尽管具体的可写流实例千差万别,所有的实例都遵循如下所示的相同的基础使用模式:

const myStream = getWritableStreamSomehow(); 
myStream.write('some data'); 
myStream.write('some more data'); 
myStream.end('done writing data'); 

Class: stream.Writable

close事件

The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

当流和流底层的任意资源(如文件描述符)关闭时,close事件会被触发。close事件意味着不会有其他的事件被触发,也不会有其他的计算发生了。

Not all Writable streams will emit the 'close' event.

不是所有的可写流都触发close事件

drain事件

If a call to stream.write(chunk) returns false, the 'drain' event will be emitted when it is appropriate to resume writing data to the stream.

如果调用stream.write(chunk)返回假,当可以向流写数据的时候,drain事件就会被触发

// Write the data to the supplied writable stream one million times. 
// Be attentive to back-pressure. 
function writeOneMillionTimes(writer, data, encoding, callback) { 
let i = 1000000; 
write(); 
function write() { 
var ok = true; 
do { 
i--; 
if (i === 0) { 
// last time! 
writer.write(data, encoding, callback); 
} else { 
// see if we should continue, or wait 
// don't pass the callback, because we're not done yet. 
ok = writer.write(data, encoding); 
} 
} while (i > 0 && ok); 
if (i > 0) { 
// had to stop early! 
// write some more once it drains 
writer.once('drain', write); 
} 
} 
} 
error事件

The 'error' event is emitted if an error occurred while writing or piping data. The listener callback is passed a single Error argument when called.

error事件会在写数据或导流数据出错时触发。回调函数在调用的时候会接收一个Error参数

Note: The stream is not closed when the 'error' event is emitted.

注意 error触发的时候流并没有关闭

finish事件

finish事件在stream.end()调用后触发,所有数据都被冲到底层的系统中 (即流的缓冲区中不再有数据)。

const writer = getWritableStreamSomehow(); 
for (var i = 0; i < 100; i ++) { 
writer.write('hello, #${i}!\n'); 
} 
writer.end('This is the end\n'); 
writer.on('finish', () => { 
console.error('All writes are now complete.'); 
}); 
pipe 事件

The 'pipe' event is emitted when the stream.pipe() method is called on a readable stream, adding this writable to its set of destinations.

pipe 事件在一个可读流调用pipe方法的时候被触发,把这个可写流作为可读流数据的目的地。

  • src <stream.Readable> source stream that is piping to this writable

回调函数中的src参数指的是导流的那个可读流。

const writer = getWritableStreamSomehow(); 
const reader = getReadableStreamSomehow(); 
writer.on('pipe', (src) => { 
console.error('something is piping into the writer'); 
assert.equal(src, reader); 
}); 
reader.pipe(writer); 
unpipe 事件

The 'unpipe' event is emitted when the stream.unpipe() method is called on a Readable stream, removing this Writable from its set of destinations.

unpipe 事件在一个可读流调用unpipe方法的时候被触发,将这个流从可读流的目标移除。

const writer = getWritableStreamSomehow(); 
const reader = getReadableStreamSomehow(); 
writer.on('unpipe', (src) => { 
console.error('Something has stopped piping into the writer.'); 
assert.equal(src, reader); 
}); 
reader.pipe(writer); 
reader.unpipe(writer); 
writable.cork()

The writable.cork() method forces all written data to be buffered in memory. The buffered data will be flushed when either the stream.uncork() or stream.end() methods are called.

writable.cork() 强制所有写入的数据都存在内存的缓冲区中。缓存的数据会在调用 stream.uncork() 或 stream.end() 调用的时候写入底层并清除缓冲区。

The primary intent of writable.cork() is to avoid a situation where writing many small chunks of data to a stream do not cause a backup in the internal buffer that would have an adverse impact on performance. In such situations, implementations that implement the writable._writev() method can perform buffered writes in a more optimized manner.

writable.cork()的最初目的是避免这样一种情景:写入许多小块数据到流中不会在流的内部缓冲区中备份,这会影响性能。在这种情况下,实现了writable._writev()方法的可以以一种更优化的方式执行写入缓存操作。

writable.end([chunk][, encoding][, callback])
  • chunk <String> | <Buffer> | <any> Optional data to write. For streams not operating in object mode, chunk must be a string or a Buffer. For object mode streams, chunk may be any JavaScript value other than null.
  • encoding <String> The encoding, if chunk is a String
  • callback <Function> Optional callback for when the stream is finished

chunk 在非对象模式时必须是是Buffer或String类型的。在对象模式下可以是任意类型。
encoding 在chunk是String类型时可以指定编码格式
callback 流结束的时候调用的回调函数

Calling the writable.end() method signals that no more data will be written to the Writable. The optional chunk and encoding arguments allow one final additional chunk of data to be written immediately before closing the stream. If provided, the optional callback function is attached as a listener for the 'finish' event.

调用end()方法意味着不会再有更多的数据写入流对象了。chucnk和encoding参数允许在流关闭前写入最后一部分数据。如果提供了callback参数,则将它当做是finish事件的回调。

Calling the stream.write() method after calling stream.end() will raise an error.

在end()之后再调用stream.write()方法会报错。

writable.setDefaultEncoding(encoding)

The writable.setDefaultEncoding() method sets the default encoding for a Writable stream.

这个方法设置了默认的编码方式.函数返回的是stream本身。

writable.uncork()

The writable.uncork() method flushes all data buffered since stream.cork() was called.

writable.uncork() 将自从writable.cork()调用后缓存的所有数据清洗掉(指清空缓冲区,写入底层)

When using writable.cork() and writable.uncork() to manage the buffering of writes to a stream, it is recommended that calls to writable.uncork() be deferred using process.nextTick(). Doing so allows batching of all writable.write() calls that occur within a given Node.js event loop phase.

当用cork和uncork来管理写入到流的缓冲数据时,推荐在process.nextTick中uncork。这样做可以在一次事件循环中批处理stream.write()的调用。

stream.cork(); 
stream.write('some '); 
stream.write('data '); 
process.nextTick(() => stream.uncork()); 

If the writable.cork() method is called multiple times on a stream, the same number of calls to writable.uncork() must be called to flush the buffered data.

uncork 和cork调用的次数应该相等

stream.cork(); 
stream.write('some '); 
stream.cork(); 
stream.write('data '); 
process.nextTick(() => { 
stream.uncork(); 
// The data will not be flushed until uncork() is called a second time. 
stream.uncork(); 
}); 
writable.write(chunk[, encoding][, callback])
  • callback <Function> Callback for when this chunk of data is flushed
  • Returns: <Boolean> false if the stream wishes for the calling code to wait for the 'drain' event to be emitted before continuing to write additional data; otherwise true.

callback指定的函数在数据被冲洗(写入底层)的时候调用。write函数返回布尔值,如果stream希望在drain事件被触发后再写入更多数据,则返回false,否则返回true。

私货:如上所示,callback在写入底层后才会调用。如下所示 在5秒以后write的callback才调用。

var stream = require('stream'), 
fs = require('fs'); 

writer = fs.createWriteStream('corkwrite.txt') 

writer.cork() 

writer.write("fuckyou",function(){ 
console.error('write callback') 
}) 

setTimeout(function(){ 
console.log('uncork called') 
writer.uncork(); 
},5000) 

The writable.write() method writes some data to the stream, and calls the supplied callback once the data has been fully handled. If an error occurs, the callback may or may not be called with the error as its first argument. To reliably detect write errors, add a listener for the 'error' event.

write()函数写数据到stream,一旦数据被处理后,调用回调。如果错误发生,回调函数的第一个参数不一定是error对象。更可靠的方法是监听error事件。

The return value is true if the internal buffer is less than the highWaterMark configured when the stream was created after admitting chunk. If false is returned, further attempts to write data to the stream should stop until the 'drain' event is emitted.

在接收过数据后,如果内部的缓冲区小于 初始化时设定的highWaterMark返回true。如果返回了false,之后尝试写数据的操作应该在drain事件被触发后再执行。

While a stream is not draining, calls to write() will buffer chunk, and return false. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the 'drain' event will be emitted. It is recommended that once write() returns false, no more chunks be written until the 'drain' event is emitted. While calling write() on a stream that is not draining is allowed, Node.js will buffer all written chunks until maximum memory usage occurs, at which point it will abort unconditionally. Even before it aborts, high memory usage will cause poor garbage collector performance and high RSS (which is not typically released back to the system, even after the memory is no longer required). Since TCP sockets may never drain if the remote peer does not read the data, writing a socket that is not draining may lead to a remotely exploitable vulnerability.

当流没有耗尽时,调用write()会缓存数据,然后返回false。一旦所有缓存的部分都被底层接受,drain事件会被触发。推荐在write返回false的时候,不再像流中写数据,知道drain被触发为止。尽管在流未耗尽的时候调用write()是允许的,Node会缓存所有的数据,直到超过内存的最大值。超过内存最大值的时候,会无条件地终止程序运行。即使没有终止运行,太高的内存占用会导致垃圾回收的性能降低以及高的 RSS(即使在内存不使用的情况下依然不贵返回给系统)。因为如果远程连接的一方不读取数据,TCP socket可能永远不会耗尽,所以向一个没有耗尽的socket写数据会导致远程漏洞利用。

Writing data while the stream is not draining is particularly problematic for a Transform, because the Transform streams are paused by default until they are piped or an 'data' or 'readable' event handler is added.

对于Transform流,如果在流没有耗尽的情况下写数据会产生问题。因为Transform流默认会暂停,直到它们调用了pipe(),或者readable,data事件处理程雪被添加了。

If the data to be written can be generated or fetched on demand, it is recommended to encapsulate the logic into a Readable and use stream.pipe(). However, if calling write() is preferred, it is possible to respect backpressure and avoid memory issues using the the 'drain' event:

如果要写入的数据可以按需产生或获取,推荐把数据封装成可读流,并用stream.pipe().但如果更想用可写流的write()方法的话,尽可能遵循背压(backpressure)(的原则),并用drain事件避免内存占用过高的问题。

function write (data, cb) { 
if (!stream.write(data)) { 
stream.once('drain', cb) 
} else { 
process.nextTick(cb) 
} 
} 

// Wait for cb to be called before doing any other write. 
write('hello', () => { 
console.log('write completed, do more writes now') 
}) 

A Writable stream in object mode will always ignore the encoding argument.

在对象模式下,可写流会忽略encoding参数

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions