从Readable steam(可读流)中读取数据的方式

可读流是对数据源的抽象,从Readable中读取数据的模式有两种,non-flowing(非流动模式也称为paused)模式和flowing(流动)模式。
非流动模式

  • 在非流动模式下,当流中有可读取的数据时,会触发readable事件,所以该事件被触发时需要反复调用流的read方法从缓存中获取最新的数据直到read方法返回null,表示没有数据可读,新建的可读流默认处于该模式下。常用方式如下:
const stream = fs.createReadStream(path.resolve(__dirname, './xxx.js')); stream.on('readable', () => { let chunk; while (null !== (chunk = stream.read())) { console.log('chunk: ' + chunk.toString()); } });

  • 在没有指定编码方式的前提下,read方法返回的是Buffer,可以调用setEncoding方法指定数据的编码,这样调用read方法时就可以直接获取到string。提前指定encoding的好处是对于多字节字符流会确保多个字节不会被分割到不同的chunk中:
const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' }); stream.on('readable', () => { let chunk; while (null !== (chunk = stream.read())) { console.log('chunk: ' + chunk); } });

  • 在非流动模式下调用Readable.resume()或者通过pipe连接到一个Writable,可切换到流动模式,如果此时没有注册data事件处理函数,那么产生的新数据回丢失。
流动模式
  • 可以通过注册data事件处理函数让可读流切换到流动模式,在该模式下当数据源中有新的数据准备就绪时,新的数据会被传递给data事件的处理函数,可以在函数中处理新的数据。如通过以下方式读取文件的内容:
const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' }); let chunks = ''; stream.on('data', (chunk) => { chunks += chunk; }); stream.on('end', () => { console.log(chunks); });

  • 在流动模式下,可以通过两种方式切换回pause模式:
    1. 调用pause方法;
    2. 当可读流通过pipe管道连接到其他流是,调用unpipe也会使流切换到pause模式;
  • 在流动模式下注册readable事件处理函数会让流停止流动,需要通过read读取数据;如果readable处理函数被移除,如果存在data处理函数,流会再次进入flowing模式;
异步迭代器 新版本的node中的Readable实现了异步迭代器,可以更加方便的读取数据:
const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' }); async function getData(stream) { let chunks = ''; for await (let chunk of stream) { chunks += chunk; } return chunks; }getData(stream).then(data => console.log(data));

如果循环通过break, return, throw 终止,流会被销毁。每次读取的chunk的大小等于流的highWaterMark选项,没有指点该选项的情况下,默认值为64kb, 因此如果文件的大小小于该值,文件的内容会在单个chunk中被返回。
补充 【从Readable steam(可读流)中读取数据的方式】流中的数据也有两种模式,二进制模式和对象模式。二进制模式下读取的数据类型为Buffer或者String, 前面的例子都是在二进制模式下读取数据。而对象模式下可以返回任意的javascript的对象,下面举一个简单的例子, Readable.from()创建的流默认是objectMode:
const stream = Readable.from([{ name: 'Bob', }, { name: 'Jenny' }, { name: 'Lili' }]); async function getData(stream) { for await (let chunk of stream) { console.log(chunk); } }getData(stream);

输出如下:
{ name: 'Bob' } { name: 'Jenny' } { name: 'Lili' }

    推荐阅读