Stream 流原理深入
一句话概述:四种 Stream 类型、背压机制、pipe 实现原理、自定义 Transform——掌握 Node.js 高效处理大数据的核心
什么是 Stream?
定义:Stream 是 Node.js 中处理流式数据的抽象接口,允许以块(chunk)的方式读取或写入数据,而不是一次性加载到内存。
涉及场景:
- 大文件处理:读取/写入 GB 级文件
- 网络传输:HTTP 请求/响应、WebSocket
- 数据转换:压缩、加密、编码转换
- 实时数据:日志流、视频流
作用:
- 降低内存占用(固定缓冲区大小)
- 提高处理效率(边读边处理)
- 支持管道操作(pipe)
一、四种 Stream 类型
类型对比
| 类型 | 说明 | 常见实例 | 核心方法 |
|---|---|---|---|
| Readable | 可读流 | fs.createReadStream、http.IncomingMessage | read()、pipe() |
| Writable | 可写流 | fs.createWriteStream、http.ServerResponse | write()、end() |
| Duplex | 双工流(可读可写) | net.Socket、TCP sockets | 继承 Readable + Writable |
| Transform | 转换流(读写过程中转换) | zlib.createGzip、crypto.createCipher | _transform() |
Readable Stream 原理
两种读取模式
const fs = require('fs');
const readable = fs.createReadStream('file.txt');
// 1. 流动模式(flowing mode)- 自动读取
readable.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节`);
});
// 2. 暂停模式(paused mode)- 手动读取
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(`读取 ${chunk.length} 字节`);
}
});highWaterMark 缓冲区大小
// 默认 64KB
const readable1 = fs.createReadStream('file.txt');
// 自定义缓冲区大小为 1MB
const readable2 = fs.createReadStream('file.txt', {
highWaterMark: 1024 * 1024
});
// 对象模式下默认为 16 个对象
const readable3 = new Readable({
objectMode: true,
highWaterMark: 100 // 缓冲 100 个对象
});核心事件
const readable = fs.createReadStream('file.txt');
// data 事件 - 有数据可读(流动模式)
readable.on('data', (chunk) => {
console.log('接收数据:', chunk);
});
// readable 事件 - 有数据可读(暂停模式)
readable.on('readable', () => {
const chunk = readable.read();
});
// end 事件 - 数据读取完毕
readable.on('end', () => {
console.log('读取完成');
});
// error 事件 - 发生错误
readable.on('error', (err) => {
console.error('读取错误:', err);
});
// close 事件 - 流关闭
readable.on('close', () => {
console.log('流已关闭');
});模式切换
const readable = fs.createReadStream('file.txt');
// 暂停模式 → 流动模式
readable.pause(); // 暂停
readable.resume(); // 恢复
// 添加 data 监听器会自动切换到流动模式
readable.on('data', () => {});
// pipe() 也会切换到流动模式
readable.pipe(writable);Writable Stream 原理
基本写入操作
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
// write() 返回 boolean
const canContinue = writable.write('Hello ');
if (!canContinue) {
console.log('缓冲区已满,需要等待 drain 事件');
}
// end() 结束写入
writable.end('World!', () => {
console.log('写入完成');
});缓冲区机制
const writable = fs.createWriteStream('output.txt', {
highWaterMark: 16 * 1024 // 16KB 缓冲区
});
// 当缓冲区数据 >= highWaterMark 时,write() 返回 false
for (let i = 0; i < 100; i++) {
const canWrite = writable.write(`数据块 ${i}\n`);
if (!canWrite) {
console.log(`第 ${i} 次写入时缓冲区满`);
break;
}
}drain 事件处理背压
function writeMillionTimes(writer, data, callback) {
let i = 1000000;
function write() {
let ok = true;
while (i > 0 && ok) {
i--;
if (i === 0) {
// 最后一次写入
writer.write(data, callback);
} else {
// 继续写入
ok = writer.write(data);
}
}
if (i > 0) {
// 缓冲区满,等待 drain 事件
writer.once('drain', write);
}
}
write();
}核心事件
const writable = fs.createWriteStream('output.txt');
// drain 事件 - 缓冲区已清空,可以继续写入
writable.on('drain', () => {
console.log('缓冲区已清空');
});
// finish 事件 - 所有数据已写入底层系统
writable.on('finish', () => {
console.log('写入完成');
});
// error 事件 - 写入错误
writable.on('error', (err) => {
console.error('写入错误:', err);
});
// close 事件 - 流关闭
writable.on('close', () => {
console.log('流已关闭');
});Duplex Stream
定义:Duplex Stream 同时实现了 Readable 和 Writable 接口,读写操作相互独立。
典型应用场景
const net = require('net');
// TCP Socket 是典型的 Duplex Stream
const server = net.createServer((socket) => {
// socket 既可读又可写
// 读取客户端数据
socket.on('data', (data) => {
console.log('收到:', data.toString());
});
// 写入数据到客户端
socket.write('Hello from server\n');
// 双向管道
socket.pipe(socket); // echo server
});
server.listen(3000);独立的读写缓冲区
const { Duplex } = require('stream');
const duplex = new Duplex({
readableHighWaterMark: 64 * 1024, // 读缓冲区 64KB
writableHighWaterMark: 16 * 1024, // 写缓冲区 16KB
read(size) {
// 实现读取逻辑
this.push('data');
},
write(chunk, encoding, callback) {
// 实现写入逻辑
console.log('写入:', chunk.toString());
callback();
}
});自定义 Duplex Stream
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_read(size) {
// 从内部缓存读取
const chunk = this.data.shift();
this.push(chunk || null);
}
_write(chunk, encoding, callback) {
// 写入到内部缓存
this.data.push(chunk);
callback();
}
}
const duplex = new MyDuplex();
duplex.write('Hello');
duplex.on('data', (chunk) => {
console.log(chunk.toString()); // 'Hello'
});Transform Stream
定义:Transform Stream 是特殊的 Duplex Stream,输出是输入的转换结果。
基本使用
const { Transform } = require('stream');
// 大写转换流
const upperCase = new Transform({
transform(chunk, encoding, callback) {
// 转换数据
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin
.pipe(upperCase)
.pipe(process.stdout);_transform 和 _flush
class MyTransform extends Transform {
constructor(options) {
super(options);
this.buffer = '';
}
_transform(chunk, encoding, callback) {
// 处理每个数据块
this.buffer += chunk.toString();
// 按行分割
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // 保留不完整的行
// 输出完整的行
lines.forEach(line => {
this.push(line.toUpperCase() + '\n');
});
callback();
}
_flush(callback) {
// 流结束时调用,处理剩余数据
if (this.buffer) {
this.push(this.buffer.toUpperCase());
}
callback();
}
}实际案例:JSON 行解析器
class JSONLineParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
try {
const obj = JSON.parse(line);
this.push(obj); // 输出对象
} catch (err) {
this.emit('error', err);
}
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
try {
this.push(JSON.parse(this.buffer));
} catch (err) {
this.emit('error', err);
}
}
callback();
}
}
// 使用
fs.createReadStream('data.jsonl')
.pipe(new JSONLineParser())
.on('data', (obj) => {
console.log('解析对象:', obj);
});常见 Transform Stream
const zlib = require('zlib');
const crypto = require('crypto');
// Gzip 压缩
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// 加密
fs.createReadStream('secret.txt')
.pipe(crypto.createCipher('aes192', 'password'))
.pipe(fs.createWriteStream('secret.enc'));二、背压(Backpressure)机制
什么是背压?
定义:当数据生产速度 > 消费速度时,未处理的数据会在内存中堆积,导致内存溢出。背压机制通过暂停数据生产来防止这个问题。
问题场景
const fs = require('fs');
// ❌ 错误:没有背压处理
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
writable.write(chunk); // 忽略返回值,可能导致内存堆积
});内存堆积示例
// 快速读取 + 慢速写入 = 内存爆炸
const readable = fs.createReadStream('1GB-file.txt', {
highWaterMark: 64 * 1024 // 每次读 64KB
});
const writable = fs.createWriteStream('output.txt', {
highWaterMark: 16 * 1024 // 缓冲区只有 16KB
});
readable.on('data', (chunk) => {
// 写入速度跟不上读取速度
// 数据会堆积在 writable 的内部缓冲区
// 最终导致内存占用不断增长
writable.write(chunk);
});背压的表现
const writable = fs.createWriteStream('output.txt');
for (let i = 0; i < 1000000; i++) {
const canWrite = writable.write(`Line ${i}\n`);
if (!canWrite) {
console.log(`第 ${i} 次写入时触发背压`);
// 此时 writable 的内部缓冲区已满
// 继续写入会导致数据堆积在内存中
break;
}
}pipe() 如何处理背压
pipe() 自动处理背压
const fs = require('fs');
// ✅ 正确:pipe() 自动处理背压
fs.createReadStream('large-file.txt')
.pipe(fs.createWriteStream('output.txt'));
// pipe() 内部实现(简化版)
Readable.prototype.pipe = function(dest) {
const src = this;
src.on('data', (chunk) => {
const canWrite = dest.write(chunk);
if (!canWrite) {
// 写入缓冲区满,暂停读取
src.pause();
}
});
dest.on('drain', () => {
// 写入缓冲区清空,恢复读取
src.resume();
});
src.on('end', () => {
dest.end();
});
return dest;
};pipe() 的优势
// 自动处理背压
// 自动处理错误
// 自动清理资源
// 支持链式调用
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'password'))
.pipe(fs.createWriteStream('output.enc.gz'))
.on('finish', () => {
console.log('处理完成');
});pipeline() 更好的错误处理
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
// pipeline 会自动处理背压和错误
await pipelineAsync(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.gz')
);
// 任何流发生错误都会自动清理所有流
pipeline(
fs.createReadStream('input.txt'),
transform1,
transform2,
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('Pipeline 失败:', err);
} else {
console.log('Pipeline 成功');
}
}
);手动处理背压
监听 write() 返回值
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// 缓冲区满,暂停读取
readable.pause();
console.log('暂停读取,等待 drain');
}
});
writable.on('drain', () => {
// 缓冲区清空,恢复读取
console.log('恢复读取');
readable.resume();
});
readable.on('end', () => {
writable.end();
});完整的背压处理示例
class BackpressureHandler {
constructor(readable, writable) {
this.readable = readable;
this.writable = writable;
this.isPaused = false;
}
start() {
this.readable.on('data', (chunk) => {
const canWrite = this.writable.write(chunk);
if (!canWrite && !this.isPaused) {
this.isPaused = true;
this.readable.pause();
console.log('触发背压,暂停读取');
}
});
this.writable.on('drain', () => {
if (this.isPaused) {
this.isPaused = false;
this.readable.resume();
console.log('背压解除,恢复读取');
}
});
this.readable.on('end', () => {
this.writable.end();
});
this.readable.on('error', (err) => {
console.error('读取错误:', err);
this.writable.destroy(err);
});
this.writable.on('error', (err) => {
console.error('写入错误:', err);
this.readable.destroy(err);
});
}
}
const handler = new BackpressureHandler(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt')
);
handler.start();三、自定义 Stream 实现
自定义 Readable Stream
基本实现
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.current = 0;
}
_read(size) {
// 当需要数据时调用
if (this.current < this.max) {
this.push(`${this.current}\n`);
this.current++;
} else {
// 推送 null 表示流结束
this.push(null);
}
}
}
const counter = new CounterStream(10);
counter.pipe(process.stdout);异步数据源
class AsyncDataStream extends Readable {
constructor(options) {
super(options);
this.index = 0;
}
async _read(size) {
try {
// 模拟异步获取数据
const data = await this.fetchData(this.index);
if (data) {
this.push(data);
this.index++;
} else {
this.push(null);
}
} catch (err) {
this.destroy(err);
}
}
async fetchData(index) {
// 模拟 API 调用
await new Promise(resolve => setTimeout(resolve, 100));
return index < 10 ? `Data ${index}\n` : null;
}
}对象模式 Readable
class ObjectStream extends Readable {
constructor(data, options) {
super({ ...options, objectMode: true });
this.data = data;
this.index = 0;
}
_read(size) {
if (this.index < this.data.length) {
// 推送对象而不是 Buffer
this.push(this.data[this.index]);
this.index++;
} else {
this.push(null);
}
}
}
const stream = new ObjectStream([
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
]);
stream.on('data', (obj) => {
console.log(obj); // { id: 1, name: 'Alice' }
});自定义 Writable Stream
基本实现
const { Writable } = require('stream');
class ConsoleStream extends Writable {
_write(chunk, encoding, callback) {
// 处理数据块
console.log(`[LOG] ${chunk.toString()}`);
// 调用 callback 表示写入完成
callback();
}
}
const logger = new ConsoleStream();
logger.write('Hello World\n');
logger.end();批量写入优化
class BatchWriter extends Writable {
constructor(options) {
super(options);
this.batch = [];
this.batchSize = 100;
}
_write(chunk, encoding, callback) {
this.batch.push(chunk.toString());
if (this.batch.length >= this.batchSize) {
this.flush();
}
callback();
}
_final(callback) {
// 流结束时调用,写入剩余数据
this.flush();
callback();
}
flush() {
if (this.batch.length > 0) {
console.log(`批量写入 ${this.batch.length} 条记录`);
// 实际写入操作
this.batch = [];
}
}
}数据库写入流
class DatabaseWriter extends Writable {
constructor(db, table, options) {
super({ ...options, objectMode: true });
this.db = db;
this.table = table;
}
async _write(record, encoding, callback) {
try {
await this.db.insert(this.table, record);
callback();
} catch (err) {
callback(err);
}
}
async _writev(chunks, callback) {
// 批量写入优化
try {
const records = chunks.map(chunk => chunk.chunk);
await this.db.batchInsert(this.table, records);
callback();
} catch (err) {
callback(err);
}
}
}自定义 Transform Stream
CSV 转 JSON 转换器
const { Transform } = require('stream');
class CSVToJSON extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (!this.headers) {
this.headers = line.split(',');
} else {
const values = line.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
}
callback();
}
_flush(callback) {
if (this.buffer && this.headers) {
const values = this.buffer.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(obj);
}
callback();
}
}
// 使用
fs.createReadStream('data.csv')
.pipe(new CSVToJSON())
.on('data', (obj) => {
console.log(obj); // { name: 'Alice', age: '30' }
});数据验证转换流
class ValidatorStream extends Transform {
constructor(schema, options) {
super({ ...options, objectMode: true });
this.schema = schema;
}
_transform(data, encoding, callback) {
try {
// 验证数据
const validated = this.validate(data);
this.push(validated);
callback();
} catch (err) {
// 可以选择跳过无效数据或报错
this.emit('invalid', { data, error: err });
callback(); // 继续处理下一条
}
}
validate(data) {
// 简单验证逻辑
if (typeof data.age !== 'number' || data.age < 0) {
throw new Error('Invalid age');
}
return data;
}
}压缩统计流
class CompressionStats extends Transform {
constructor(options) {
super(options);
this.bytesIn = 0;
this.bytesOut = 0;
}
_transform(chunk, encoding, callback) {
this.bytesIn += chunk.length;
// 实际压缩(这里简化)
const compressed = zlib.gzipSync(chunk);
this.bytesOut += compressed.length;
this.push(compressed);
callback();
}
_flush(callback) {
const ratio = ((1 - this.bytesOut / this.bytesIn) * 100).toFixed(2);
console.log(`压缩率: ${ratio}%`);
console.log(`原始: ${this.bytesIn} 字节`);
console.log(`压缩后: ${this.bytesOut} 字节`);
callback();
}
}四、Stream 性能优化
缓冲区大小调优
highWaterMark 对性能的影响
const fs = require('fs');
// 小缓冲区:频繁 I/O,CPU 开销大
const small = fs.createReadStream('large-file.txt', {
highWaterMark: 1024 // 1KB
});
// 默认缓冲区:平衡性能
const normal = fs.createReadStream('large-file.txt', {
highWaterMark: 64 * 1024 // 64KB(默认)
});
// 大缓冲区:减少 I/O 次数,但占用更多内存
const large = fs.createReadStream('large-file.txt', {
highWaterMark: 1024 * 1024 // 1MB
});性能测试
async function benchmarkBufferSize(filePath, bufferSize) {
return new Promise((resolve) => {
const start = Date.now();
let chunks = 0;
const readable = fs.createReadStream(filePath, {
highWaterMark: bufferSize
});
readable.on('data', () => {
chunks++;
});
readable.on('end', () => {
const duration = Date.now() - start;
resolve({ bufferSize, duration, chunks });
});
});
}
// 测试不同缓冲区大小
const sizes = [1024, 16 * 1024, 64 * 1024, 256 * 1024, 1024 * 1024];
for (const size of sizes) {
const result = await benchmarkBufferSize('large-file.txt', size);
console.log(`${size / 1024}KB: ${result.duration}ms, ${result.chunks} chunks`);
}
// 输出示例:
// 1KB: 1250ms, 10000 chunks
// 16KB: 850ms, 625 chunks
// 64KB: 720ms, 156 chunks
// 256KB: 680ms, 39 chunks
// 1024KB: 650ms, 10 chunks根据场景选择缓冲区大小
// 网络流:较小缓冲区,降低延迟
const networkStream = net.createConnection({
highWaterMark: 16 * 1024 // 16KB
});
// 文件流:较大缓冲区,提高吞吐量
const fileStream = fs.createReadStream('file.txt', {
highWaterMark: 256 * 1024 // 256KB
});
// 对象流:按对象数量设置
const objectStream = new Readable({
objectMode: true,
highWaterMark: 100 // 100 个对象
});对象模式 vs 二进制模式
二进制模式(默认)
const { Readable } = require('stream');
// 默认处理 Buffer/String
const binaryStream = new Readable({
read() {
this.push(Buffer.from('Hello'));
this.push(null);
}
});
binaryStream.on('data', (chunk) => {
console.log(chunk); // <Buffer 48 65 6c 6c 6f>
console.log(chunk instanceof Buffer); // true
});对象模式
// 处理任意 JavaScript 对象
const objectStream = new Readable({
objectMode: true,
read() {
this.push({ id: 1, name: 'Alice' });
this.push({ id: 2, name: 'Bob' });
this.push(null);
}
});
objectStream.on('data', (obj) => {
console.log(obj); // { id: 1, name: 'Alice' }
console.log(typeof obj); // 'object'
});混合使用
// 二进制输入 → 对象输出
class JSONParser extends Transform {
constructor() {
super({
readableObjectMode: true, // 输出对象
writableObjectMode: false // 输入二进制
});
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
try {
this.push(JSON.parse(line));
} catch (err) {
// 忽略无效 JSON
}
}
callback();
}
}
fs.createReadStream('data.jsonl') // 二进制
.pipe(new JSONParser()) // 输出对象
.on('data', (obj) => {
console.log(obj); // JavaScript 对象
});性能对比
// 对象模式:highWaterMark 表示对象数量
const objStream = new Readable({
objectMode: true,
highWaterMark: 16 // 缓冲 16 个对象
});
// 二进制模式:highWaterMark 表示字节数
const binStream = new Readable({
objectMode: false,
highWaterMark: 16 * 1024 // 缓冲 16KB
});
// 对象模式适合:
// - 数据库记录流
// - JSON 数据处理
// - 业务对象传输
// 二进制模式适合:
// - 文件 I/O
// - 网络传输
// - 大数据处理面试高频题
1. Stream 的四种类型分别是什么?
四种类型:
Readable Stream(可读流)
- 用于读取数据
- 示例:
fs.createReadStream()、http.IncomingMessage - 核心方法:
read()、pipe() - 两种模式:流动模式(flowing)和暂停模式(paused)
Writable Stream(可写流)
- 用于写入数据
- 示例:
fs.createWriteStream()、http.ServerResponse - 核心方法:
write()、end() - 关键事件:
drain(缓冲区清空)、finish(写入完成)
Duplex Stream(双工流)
- 同时可读可写,读写操作独立
- 示例:
net.Socket、TCP 连接 - 特点:有独立的读写缓冲区
Transform Stream(转换流)
- 特殊的 Duplex Stream,输出是输入的转换
- 示例:
zlib.createGzip()、crypto.createCipher() - 核心方法:
_transform()、_flush()
关系图:
Readable Writable
\ /
Duplex
|
Transform2. 什么是背压?如何处理?
背压定义: 当数据生产速度大于消费速度时,未处理的数据会在内存中堆积,导致内存溢出。背压机制通过暂停数据生产来防止这个问题。
产生原因:
// 快速读取 + 慢速写入 = 内存堆积
const readable = fs.createReadStream('large-file.txt'); // 快
const writable = fs.createWriteStream('slow-device.txt'); // 慢
readable.on('data', chunk => {
writable.write(chunk); // 忽略返回值,危险!
});处理方法:
- 使用 pipe()(推荐)
readable.pipe(writable); // 自动处理背压- 使用 pipeline()(更好的错误处理)
const { pipeline } = require('stream');
pipeline(readable, transform, writable, (err) => {
if (err) console.error('失败:', err);
});- 手动处理
readable.on('data', chunk => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // 暂停读取
}
});
writable.on('drain', () => {
readable.resume(); // 恢复读取
});关键点:
write()返回false表示缓冲区满drain事件表示缓冲区清空pipe()自动调用pause()/resume()
3. pipe() 方法的实现原理?
简化实现:
Readable.prototype.pipe = function(dest, options) {
const src = this;
// 1. 监听 data 事件,写入目标流
src.on('data', (chunk) => {
const canWrite = dest.write(chunk);
// 2. 处理背压:写入缓冲区满时暂停读取
if (!canWrite) {
src.pause();
}
});
// 3. 监听 drain 事件,恢复读取
dest.on('drain', () => {
src.resume();
});
// 4. 数据读取完毕,关闭目标流
src.on('end', () => {
if (options?.end !== false) {
dest.end();
}
});
// 5. 错误处理
src.on('error', (err) => {
dest.destroy(err);
});
// 6. 返回目标流,支持链式调用
return dest;
};核心机制:
- 自动切换到流动模式:监听
data事件 - 背压处理:
write()返回false时pause() - 恢复流动:
drain事件时resume() - 资源清理:
end事件时关闭目标流 - 链式调用:返回目标流
使用示例:
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'pwd'))
.pipe(fs.createWriteStream('output.enc.gz'));4. 如何实现一个自定义的 Transform Stream?
基本步骤:
const { Transform } = require('stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// 初始化状态
}
// 1. 实现 _transform 方法(必须)
_transform(chunk, encoding, callback) {
try {
// 处理数据
const transformed = this.process(chunk);
// 推送转换后的数据
this.push(transformed);
// 调用 callback 表示完成
callback();
} catch (err) {
callback(err);
}
}
// 2. 实现 _flush 方法(可选)
_flush(callback) {
// 流结束时处理剩余数据
if (this.buffer) {
this.push(this.buffer);
}
callback();
}
process(chunk) {
// 实际转换逻辑
return chunk;
}
}实际案例:大写转换
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// 使用
process.stdin
.pipe(new UpperCaseTransform())
.pipe(process.stdout);关键点:
- 继承 Transform 类
- 实现
_transform(chunk, encoding, callback)chunk: 输入数据encoding: 编码类型callback: 完成回调,callback(err)或callback()
- 使用
this.push()输出数据 - 可选实现
_flush(callback)处理剩余数据 - 支持对象模式:
super({ objectMode: true })
5. Stream 和一次性读取文件的性能差异?
内存占用对比:
// 一次性读取 1GB 文件
const data = fs.readFileSync('1GB-file.txt');
// 内存占用:~1GB
// 流式读取 1GB 文件
fs.createReadStream('1GB-file.txt')
.pipe(fs.createWriteStream('output.txt'));
// 内存占用:~64KB(缓冲区大小)性能测试:
// 测试 100MB 文件
// 方法 1:一次性读取
console.time('readFile');
const data = fs.readFileSync('100MB.txt');
fs.writeFileSync('output1.txt', data);
console.timeEnd('readFile');
// 时间:~500ms
// 内存峰值:~100MB
// 方法 2:Stream
console.time('stream');
fs.createReadStream('100MB.txt')
.pipe(fs.createWriteStream('output2.txt'))
.on('finish', () => {
console.timeEnd('stream');
});
// 时间:~550ms
// 内存峰值:~128KB对比总结:
| 特性 | 一次性读取 | Stream |
|---|---|---|
| 内存占用 | 文件大小 | 固定缓冲区(~64KB) |
| 速度 | 略快 | 略慢 |
| 大文件 | 可能 OOM | 无限大小 |
| 适用场景 | 小文件(<10MB) | 大文件、实时处理 |
| 处理时机 | 全部读取后 | 边读边处理 |
使用建议:
- 小文件(<10MB):使用
readFile()/readFileSync() - 大文件(>10MB):使用 Stream
- 实时处理:必须使用 Stream
- 网络传输:必须使用 Stream
Stream 的优势:
- 内存友好:固定缓冲区大小
- 无文件大小限制:可处理 GB 级文件
- 边读边处理:提高响应速度
- 组合性强:通过 pipe 链接多个操作