node常用内置模块(Stream)

node常用内置模块(Stream) 一、Stream了解

node.js中的流就是处理流式数据的抽象接口,文件操作系统和网络模块实现了流接口

不使用流的常见问题:
同步读取资源文件,用户与要等待数据读取完成 资源文件最终一次性加载至内存,开销较大

使用流的图解(数据的分段传输):
node常用内置模块(Stream)
文章图片

配合管道对需求的加工:
node常用内置模块(Stream)
文章图片

流处理数据的优势:
时间效率:流的分段处理可以同时操作多个数据chunk 空间效率:同一时间流无须占据大内存空间 使用方便:流配合管理,扩展程序变得简单

node.js中流的分类:
Readalbe:可读流,能够是实现数据的读取 Writealbe:可写流,能够实现数据的写操作 Duplex:双工流,既可读又可写 Transform:转换流,可读可写,还是实现数据转换

nodejs流特点:
Stream模块实现了四个具体的抽象 所有流都继承自EventEmitter

二、基本API
1.可读流
专门生产供程序消费数据的流

自定义可读流:
继承stream里的Readable 重写_read方法调用push产出数据

可读流基本原理:
node常用内置模块(Stream)
文章图片

消费数据:
readable事件:当流中存在可读取的数据是触发 data事件:当流中数据块传给消费者后触发

自定义可读流代码实现:
const { Readable } = require('stream'); // 模拟底层数据 let source = ['lg', 'zce', 'syy']; class MyReadable extends Readable { constructor(source) { super(); this.source = source; } _read() { let data = https://www.it610.com/article/this.source.shift() || null; // 如果没有数据,则返回 null this.push(data); // 将数据推入到流中 } }let mr = new MyReadable(source); // reaadable 默认是暂停模式 // mr.on('readable', () => { //let data = https://www.it610.com/article/null; //while(data = mr.read(2)){ //console.log(data.toString()); //} // })// 有可能都不放入缓存中,直接输出 mr.on('data', (chunk) => { console.log(chunk.toString()); })

2.可写流
用于消费数据的流

自定义可写流:
继承stream模块的Writeable 重写_write方法,调用write执行写入

可写流事件
pipe事件:可读流调用pipe()方法时触发 unpipe事件:可读流调用unpipe()方法时触发

自定义可写流代码实现:
const { Writable } = require('stream'); class MyWriteable extends Writable { constructor() { super(); } _write(chunk, encoding, done) { process.stdout.write(chunk.toString() + '<-----\n'); process.nextTick(done); } }let mw = new MyWriteable(); mw.write('江江学习', 'utf-8', () => { console.log('write success'); })

3.双工流
Duplex是双工流,既能生产又能消费,读写相互独立,读操作创建的数据不能当作写操作的数据源去使用

自定义双工流
继承Duplex类 重写_read方法,调用push生产数据 重写_write方法,调用write消费数据

代码实现:
let { Duplex } = require('stream'); class MyDuplex extends Duplex { constructor(source) { super(); this.source = source; } _read() { let data = https://www.it610.com/article/this.source.shift() || null; this.push(data); } _write(chunk, en, next) { process.stdout.write(chunk.toString() +'<-----\n'); process.nextTick(next); } }let source = ['hello', 'world', '!']; let md = new MyDuplex(source); md.write('江江',()=>{ console.log('write success'); })md.on('data', (chunk) => { console.log(chunk.toString()); })

Transform
Transform也是一个双工流,读写操作进行了联通

Transform自定义实现:
继承Transform类 重写_transform方法,调用push和callback 重写_flush方法,处理剩余数据

transform自定义代码实现:
let { Transform } = require('stream'); class MyTransform extends Transform{ constructor(){ super(); }_transform(chunk,en,cb){ this.push(chunk.toString().toUpperCase()); cb(null); } }let t = new MyTransform(); t.write('hello'); t.on('data',(chunk)=>{ console.log(chunk.toString()); })

三、文件读写流
1.文件可读流 文件可读流代码中使用:
const fs = require('fs'); const path = require('path'); let rs = fs.createReadStream('test.txt', { flags: 'r', encoding: null,// 返回buffer fd: null,// 默认值从3开始的, 0、1、2被输入、输出、错误占用了 mode: 438,// 权限控制 autoClose: false,// 是否自动关闭文件 start: 0,// 从文件的某个位置开始读取 // end: 10,// 在文件的某个位置结束读取 highWaterMark: 16// 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个 })// data 事件 // rs.on('data',(chunk)=>{ //console.log(chunk.toString()); //rs.pause(); // 流动模式切换到暂停模式 //setTimeout(()=>{ //rs.resume(); // 恢复到流动模式 //},1000) // })// readable 事件 rs.on('readable', () => { // let data = https://www.it610.com/article/rs.read(); // console.log(data) let data = null; while(data = rs.read(3)){// 每次从缓存中读取多少个字节 console.log(data.toString()); console.log('------',rs._readableState.length); // 剩余多少个字节 } })

其它事件:
const fs = require('fs'); const path = require('path'); let rs = fs.createReadStream('test.txt', { flags: 'r', encoding: null,// 返回buffer fd: null,// 默认值从3开始的, 0、1、2被输入、输出、错误占用了 mode: 438,// 权限控制 autoClose: false,// 是否自动关闭文件 start: 0,// 从文件的某个位置开始读取 // end: 10,// 在文件的某个位置结束读取 highWaterMark: 16// 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个 })rs.on('open', (fd) => { console.log('fd', fd,'文件打开了'); })rs.on('close',()=>{ console.log('文件关闭了') }) let bufferArr = []; rs.on('data',(chunk)=>{ bufferArr.push(chunk) })rs.on('end',()=>{ console.log(Buffer.concat(bufferArr).toString()) console.log('数据被清空之后') })rs.on('error',()=>{ console.log('出错了') })

2.文件可写流 可写流常用事件:
const fs = require('fs'); const path = require('path'); const ws = fs.createWriteStream('test.txt', { flags: 'w', mode: 438, fd: null, encoding: 'utf-8', start: 0, highWaterMark: 16// 默认16kb })ws.write('拉钩教育', () => { console.log('拉钩教育-数据写完了') })// 字符串 或者 buffer ===> fs rs // ws.write(123456,()=>{ //console.log('123456-数据写完了') // })ws.on('open', (fd) => { console.log('open', fd) })// colose 是在数据写入操作全部完成之后再执行 ws.on('close',()=>{ console.log('文件关闭了'); })ws.write('0'); // end 执行之后就意味着数据写入操作完成 ws.end('jiang'); // 可最后写入一次// ws.write('2'); ws.on('error',(err)=>{ console.log('出错了'); })

write执行流程:
node常用内置模块(Stream)
文章图片

drain事件与读写速度:
/** * 需求:"江江学习" 写入指定的文件 * 01 一次性写入 * 02 分批写入 * 对比:对内存的压力不同 */const fs = require('fs'); let ws = fs.createWriteStream('test.txt', { highWaterMark: 3 }); // ws.write('江江学习'); let source = '江江学习'.split(''); let num = 0; let flag = true; function executeWrite() {while (num != source.length && flag) { flag = ws.write(source[num++]); // 当写入的数据大于等于hightWaterMark时,会返回false } }executeWrite(); ws.on('drain',()=>{// 缓存中的数据已经被消费完了,才触发 console.log('drain 执行了'); flag = true; executeWrite(); })

四、背压机制
让数据在的生产者与消费者平滑流动的机制

1.问题发现 看一段代码发现问题:
node常用内置模块(Stream)
文章图片

数据从磁盘读取出来的速度是远远大于写入磁盘的速度的(消费者的速度跟不到生产者的速度的),WriteAble内部维护了一个队列,不能即使的消费数据导致的产能过剩,就会放入该队列中,但队列长度是有上限的,所以在当读写的过程中,如果没有实现被压机制的化,就可能会导致
内存溢出 其它进程运行变慢 GC频繁调用

了解读写机制:
Readable运行机制:
node常用内置模块(Stream)
文章图片

Writeable运行机制:
node常用内置模块(Stream)
文章图片

背压机制基本原理代码:
let fs = require('fs'); let rs = fs.createReadStream('test.txt', { highWaterMark: 4// Readable默认是16,fs中createReadStream默认为64 })let ws = fs.createWriteStream('test1.txt', { highWaterMark: 1 })let flag = true; rs.on('data',(chunk)=>{ flag = ws.write(chunk,()=>{ console.log('写完了'); }) if(!flag){ rs.pause(); } })ws.on('drain',()=>{ rs.resume(); })// 可以直接使用pipe // rs.pipe(ws);

2.模拟可读流 代码实现:
const fs = require('fs'); const EventEmitter = require('events'); class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'r'; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024; this.readOffset = 0; this.open(); // 当监听新的事件时,会被触发 this.on('newListener', (type) => { if (type == 'data') { this.read(); } }) } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { // 触发自生error事件,这里是回调函数,执行在同步代之后 this.emit('error', err); } this.fd = fd; this.emit('open', this.fd); }); }read() { if (typeof this.fd != 'number') { return this.once('open', this.read); } let buf = Buffer.alloc(this.highWaterMark); // let howMuchToRead; // 每次读多少 // if (this.end) { //// 判断end是否有存在 //howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark); // 使用剩余未读的字节数与highWaterMark中较小的一个 // } else { //howMuchToRead = this.highWaterMark; // 使用剩余未读的字节数与highWaterMark中较小的一个 // } // 可以取到末尾end下标的值,所以这里要加一 let howMuchToRead = this.end?Math.min(this.end - this.readOffset + 1, this.highWaterMark):this.highWaterMarkfs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => { if (readBytes) { this.readOffset += readBytes; this.emit('data', buf.slice(0, readBytes)) this.read() } else { this.emit('end') this.close() } }) }close() { fs.close(this.fd, () => { this.emit('close') }); } }let rs = new MyFileReadStream('test.txt', { end: 7, // 结束位置的下标,可以取到 highWaterMark: 3 }); rs.on('open', (fd) => { // 这里是同步代码,监听该事件在触发事件之前 console.log('open', fd); })rs.on('error', (err) => { console.log(err); })rs.on('data', (chunk) => { console.log(chunk) })rs.on('end', () => { console.log('end') })rs.on('close', () => { console.log('close') })

五、链表
使用wirte时,有些被写入的内容需要放入缓存中被排队等待,而且要遵循先进先出的规则,这里使用链表的数据结构来保存这些数据

为什么不使用数组:
数组存储数据的长度具有上限 数组存在塌陷问题

模拟链表实现队列:
class Node { constructor(element, next = null) { this.element = element; this.next = next; } }class LinkedList { constructor() { this.head = null; this.size = 0 } // 获取指定位置节点 _getNode(index) { if (index < 0 || index >= this.size) { throw new Error('getNode --> index error') }let currentNode = this.head; while (index--) { currentNode = currentNode.next; } return currentNode; }// 确保该下标的位置合法 _checkIndex(index) { if (index < 0 || index >= this.size) { throw new Error('index 参数错误') } }add(index, element) { if (arguments.length == 1) { element = index; index = this.size; } if (index < 0 || index > this.size) { throw new Error('index 参数错误') } let newNode = new Node(element); // index == 1 与 index != 1处理方式不同 if (index == 0) { newNode.next = this.head; this.head = newNode; } else { // 获取指定位置的前一个节点 let prevNode = this._getNode(--index); newNode.next = prevNode.next; prevNode.next = newNode; } this.size++; } remove(index) { if (this.size == 0) return undefined; this._checkIndex(index); let currentNode = this._getNode(index); if (index == 0) { this.head = currentNode.next; } else { let prevNode = this._getNode(index - 1); prevNode.next = currentNode.next; } this.size--; currentNode.next = null; return currentNode; }set(index, element) { this._checkIndex(index); this._getNode(index).element = element; }get(index) { this._checkIndex(index); let currentNode = this._getNode(index); currentNode.next = null; return currentNode; }clear() { this.head = null; this.size = 0; } } class Queue { constructor() { this.linkedList = new LinkedList(); } enQueue(data) { this.linkedList.add(data); } deQueue() { return this.linkedList.remove(0); } }const q = new Queue(); q.enQueue('node1'); q.enQueue('node2'); console.log(q.deQueue()); console.log(q.deQueue()); console.log(q.deQueue()); console.log(q)

模拟可写流:
const fs = require('fs'); const EventsEmitter = require('events'); const Queue = require('./linkedlist'); class MyWriteStream extends EventsEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'w'; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end this.encoding = options.encoding || 'utf8'; this.highWaterMark = options.highWaterMark || 16 * 1024; this.writeOffset = this.start; this.writing = false; this.writeLen = 0; this.needDrain = false; this.cache = new Queue(); this.open(); }open() { // 原生 fs.open fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit('error', err); return; } this.fd = fd; this.emit('open', fd); }) }write(chunk, encoding, cb) { // 统一成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.writeLen += chunk.length; let flag = this.writeLen < this.highWaterMark; this.needDrain = !flag; if (this.writing) { // 当前 是 正在写入状态,所以在这里将数据存入队列 this.cache.enQueue({ chunk, encoding, cb }) } else { // 当前 不是 正在写入状态,所以在这里执行写入 this.writing = true; this._write(chunk, encoding, cb); // this.writing = false; }return flag; }_write(chunk, encoding, cb) { if (typeof this.fd != 'number') { return this.once('open', () => { return this._write(chunk, encoding, () => { cb() // 清空排队的内容 this._clearBuffer(); }); }) } fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => { this.writeOffset += written; this.writeLen -= written; cb && cb(); }) }_clearBuffer() { let data = https://www.it610.com/article/this.cache.deQueue(); if(data){ this._write(data.element.chunk,data.element.encoding,()=>{ data.element.cb(); this._clearBuffer(); }) }else{ if(this.needDrain){ this.needDrain = false; this.emit('drain') } } } }let mws = new MyWriteStream('f04.txt', { highWaterMark: 4 }); mws.on('open', (fd) => { console.log('open--->', fd) })

pipe方法的使用:
const fs = require('fs'); const rs = fs.createReadStream('./f04.txt', { highWaterMark: 4// 默认64kb }); const ws = fs.createWriteStream('./f04_copy.txt', { highWaterMark: 1// 默认16kb }) rs.pipe(ws); // data 需要查看数据,可监听rs data事件

【node常用内置模块(Stream)】自定义的pipe方法(有问题,没找出来):
const fs = require('fs'); const EventEmitter = require('events'); class MyFileReadStream extends EventEmitter { constructor(path, options = {}) {...} open() {...} read() {...} close() {...}pipe(ws){ this.on('data',(data)=>{ let flag = ws.write(data); if(!flag){ // 读数据的缓存满了。开启暂停 this.pause(); // 找不到该方法 } }); this.on('drain',()=>{ // 缓存中的数据被消费完了,继续开启数据读入缓存 this.resume(); }) } }

    推荐阅读