模拟可读流,可写流实现

可读流模拟实现

const fs = require("fs"); const EventEmitter = require("events"); class MyFileReadStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flag = options.flags || "r"; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.end = options.end; this.highWaterMark = options.highWaterMark || 64 * 1024; this.readOffset = 0; this.open(); this.on("newListener", (type) => { // 判断监听类型 console.log(type); if (type === "data") { this.read(); } }); }open() { //原生open方法打开指定位置文件 fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); } else { this.fd = fd; this.emit("open", fd); //发送文件标识符 } }); }read() { // 这里无法直接获取fd,这里监听一下open时间,控制执行顺序 if (typeof this.fd !== "number") { return this.once("open", this.read); } let buf = Buffer.alloc(this.highWaterMark); let howMuchToRead; // 用户设置了end值 howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark; fs.read( this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => { if (readBytes) { this.readOffset += readBytes; this.emit("data", buf.slice(0, readBytes)); // 返回数据 this.read(); // 继续读 } else { this.emit("end"); this.close(); } } ); }close() { fs.close(this.fd, () => { this.emit("close"); }); }// pipe简化了操作 pipe(ws) { this.on("data", (data) => { let flag = ws.write(data); if (!flag) { this.pause(); } }); ws.on("drain", () => { this.resume(); }); } }let rs = new MyFileReadStream("test.txt", { end: 7, highWaterMark: 3, }); //事件触发测试 // rs.on("open", (fd) => { //console.log("open", fd); // }); // rs.on("error", (err) => { //console.log(err); // }); rs.on("data", (chunk) => { console.log(chunk); }); // rs.on("end", () => { //console.log("end"); // }); rs.on("close", () => { console.log("close"); });

【模拟可读流,可写流实现】可写流模拟实现
const fs = require("fs"); const EventEmitter = require("events"); const { Queue } = require("./linkedList"); // 队列class MyWriteStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || "w"; this.mode = options.mode || 438; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.encoding = options.encoding || "utf8"; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); this.writeOffset = this.start; this.writting = false; // 是否正在写入标识符 this.writLen = 0; this.needDrain = false; this.cache = new Queue(); }open() { // 原生fs.open fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit("error", err); } // 正常打开文件 this.fd = fd; this.emit("open", fd); }); }write(chunk, encoding, cb) { chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 只考虑字符串或者Buffer两种类型 this.writLen += chunk.length; let flag = this.writLen < this.highWaterMark; this.needDrain = !flag; // 判断是否正在写入 if (this.writting) { // 正在写入,内容需要排队 this.cache.enQueue({ chunk, encoding, cb }); } else { this.writting = true; // 当前不是正在写入那么就执行写入 this._write(chunk, encoding, () => { cb(); //清空排队内容 this._clearBuffer(); }); } return flag; }_write(chunk, encoding, cb) { // 写入操作,与可读流有同样问题,需要在open之后在拿fd操作 if (typeof this.fd !== "number") { return this.once("open", () => { return this._write(chunk, encoding, cb); }); } fs.write( this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => { this.writeOffset += written; this.writLen -= written; cb && cb(); } ); }_clearBuffer() { let data = https://www.it610.com/article/this.cache.deQueue(); if (data) { this._write(data.element.chunk, data.element.encoding, () => { data.element.cb && data.element.cb(); this._clearBuffer(); }); } else { if (this.needDrain) { this.needDrain = false; this.emit("drain"); } } } }const ws = new MyWriteStream("test.txt", { highWaterMark: 1 }); ws.open("open", (fd) => { console.log("open===>", fd); }); let flag = ws.write("1", "utf8", () => { console.log("ok1"); }); // console.log(flag); flag = ws.write("10", "utf8", () => { console.log("ok2"); }); // console.log(flag); flag = ws.write("前端前端", "utf8", () => { console.log("ok222"); }); ws.on("drain", () => { console.log("drain"); });

    推荐阅读