Skip to content

Stream 流原理深入

一句话概述:四种 Stream 类型、背压机制、pipe 实现原理、自定义 Transform——掌握 Node.js 高效处理大数据的核心

什么是 Stream?

定义:Stream 是 Node.js 中处理流式数据的抽象接口,允许以块(chunk)的方式读取或写入数据,而不是一次性加载到内存。

涉及场景

  • 大文件处理:读取/写入 GB 级文件
  • 网络传输:HTTP 请求/响应、WebSocket
  • 数据转换:压缩、加密、编码转换
  • 实时数据:日志流、视频流

作用

  1. 降低内存占用(固定缓冲区大小)
  2. 提高处理效率(边读边处理)
  3. 支持管道操作(pipe)

一、四种 Stream 类型

类型对比

类型说明常见实例核心方法
Readable可读流fs.createReadStreamhttp.IncomingMessageread()pipe()
Writable可写流fs.createWriteStreamhttp.ServerResponsewrite()end()
Duplex双工流(可读可写)net.Socket、TCP sockets继承 Readable + Writable
Transform转换流(读写过程中转换)zlib.createGzipcrypto.createCipher_transform()

Readable Stream 原理

两种读取模式

javascript
const fs = require('fs');
const readable = fs.createReadStream('file.txt');

// 1. 流动模式(flowing mode)- 自动读取
readable.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 字节`);
});

// 2. 暂停模式(paused mode)- 手动读取
readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log(`读取 ${chunk.length} 字节`);
  }
});

highWaterMark 缓冲区大小

javascript
// 默认 64KB
const readable1 = fs.createReadStream('file.txt');

// 自定义缓冲区大小为 1MB
const readable2 = fs.createReadStream('file.txt', {
  highWaterMark: 1024 * 1024
});

// 对象模式下默认为 16 个对象
const readable3 = new Readable({
  objectMode: true,
  highWaterMark: 100 // 缓冲 100 个对象
});

核心事件

javascript
const readable = fs.createReadStream('file.txt');

// data 事件 - 有数据可读(流动模式)
readable.on('data', (chunk) => {
  console.log('接收数据:', chunk);
});

// readable 事件 - 有数据可读(暂停模式)
readable.on('readable', () => {
  const chunk = readable.read();
});

// end 事件 - 数据读取完毕
readable.on('end', () => {
  console.log('读取完成');
});

// error 事件 - 发生错误
readable.on('error', (err) => {
  console.error('读取错误:', err);
});

// close 事件 - 流关闭
readable.on('close', () => {
  console.log('流已关闭');
});

模式切换

javascript
const readable = fs.createReadStream('file.txt');

// 暂停模式 → 流动模式
readable.pause(); // 暂停
readable.resume(); // 恢复

// 添加 data 监听器会自动切换到流动模式
readable.on('data', () => {});

// pipe() 也会切换到流动模式
readable.pipe(writable);

Writable Stream 原理

基本写入操作

javascript
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');

// write() 返回 boolean
const canContinue = writable.write('Hello ');
if (!canContinue) {
  console.log('缓冲区已满,需要等待 drain 事件');
}

// end() 结束写入
writable.end('World!', () => {
  console.log('写入完成');
});

缓冲区机制

javascript
const writable = fs.createWriteStream('output.txt', {
  highWaterMark: 16 * 1024 // 16KB 缓冲区
});

// 当缓冲区数据 >= highWaterMark 时,write() 返回 false
for (let i = 0; i < 100; i++) {
  const canWrite = writable.write(`数据块 ${i}\n`);
  
  if (!canWrite) {
    console.log(`第 ${i} 次写入时缓冲区满`);
    break;
  }
}

drain 事件处理背压

javascript
function writeMillionTimes(writer, data, callback) {
  let i = 1000000;
  
  function write() {
    let ok = true;
    
    while (i > 0 && ok) {
      i--;
      
      if (i === 0) {
        // 最后一次写入
        writer.write(data, callback);
      } else {
        // 继续写入
        ok = writer.write(data);
      }
    }
    
    if (i > 0) {
      // 缓冲区满,等待 drain 事件
      writer.once('drain', write);
    }
  }
  
  write();
}

核心事件

javascript
const writable = fs.createWriteStream('output.txt');

// drain 事件 - 缓冲区已清空,可以继续写入
writable.on('drain', () => {
  console.log('缓冲区已清空');
});

// finish 事件 - 所有数据已写入底层系统
writable.on('finish', () => {
  console.log('写入完成');
});

// error 事件 - 写入错误
writable.on('error', (err) => {
  console.error('写入错误:', err);
});

// close 事件 - 流关闭
writable.on('close', () => {
  console.log('流已关闭');
});

Duplex Stream

定义:Duplex Stream 同时实现了 Readable 和 Writable 接口,读写操作相互独立。

典型应用场景

javascript
const net = require('net');

// TCP Socket 是典型的 Duplex Stream
const server = net.createServer((socket) => {
  // socket 既可读又可写
  
  // 读取客户端数据
  socket.on('data', (data) => {
    console.log('收到:', data.toString());
  });
  
  // 写入数据到客户端
  socket.write('Hello from server\n');
  
  // 双向管道
  socket.pipe(socket); // echo server
});

server.listen(3000);

独立的读写缓冲区

javascript
const { Duplex } = require('stream');

const duplex = new Duplex({
  readableHighWaterMark: 64 * 1024,  // 读缓冲区 64KB
  writableHighWaterMark: 16 * 1024,  // 写缓冲区 16KB
  
  read(size) {
    // 实现读取逻辑
    this.push('data');
  },
  
  write(chunk, encoding, callback) {
    // 实现写入逻辑
    console.log('写入:', chunk.toString());
    callback();
  }
});

自定义 Duplex Stream

javascript
const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }
  
  _read(size) {
    // 从内部缓存读取
    const chunk = this.data.shift();
    this.push(chunk || null);
  }
  
  _write(chunk, encoding, callback) {
    // 写入到内部缓存
    this.data.push(chunk);
    callback();
  }
}

const duplex = new MyDuplex();
duplex.write('Hello');
duplex.on('data', (chunk) => {
  console.log(chunk.toString()); // 'Hello'
});

Transform Stream

定义:Transform Stream 是特殊的 Duplex Stream,输出是输入的转换结果。

基本使用

javascript
const { Transform } = require('stream');

// 大写转换流
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    // 转换数据
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin
  .pipe(upperCase)
  .pipe(process.stdout);

_transform 和 _flush

javascript
class MyTransform extends Transform {
  constructor(options) {
    super(options);
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    // 处理每个数据块
    this.buffer += chunk.toString();
    
    // 按行分割
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // 保留不完整的行
    
    // 输出完整的行
    lines.forEach(line => {
      this.push(line.toUpperCase() + '\n');
    });
    
    callback();
  }
  
  _flush(callback) {
    // 流结束时调用,处理剩余数据
    if (this.buffer) {
      this.push(this.buffer.toUpperCase());
    }
    callback();
  }
}

实际案例:JSON 行解析器

javascript
class JSONLineParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();
    
    for (const line of lines) {
      try {
        const obj = JSON.parse(line);
        this.push(obj); // 输出对象
      } catch (err) {
        this.emit('error', err);
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    if (this.buffer.trim()) {
      try {
        this.push(JSON.parse(this.buffer));
      } catch (err) {
        this.emit('error', err);
      }
    }
    callback();
  }
}

// 使用
fs.createReadStream('data.jsonl')
  .pipe(new JSONLineParser())
  .on('data', (obj) => {
    console.log('解析对象:', obj);
  });

常见 Transform Stream

javascript
const zlib = require('zlib');
const crypto = require('crypto');

// Gzip 压缩
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));

// 加密
fs.createReadStream('secret.txt')
  .pipe(crypto.createCipher('aes192', 'password'))
  .pipe(fs.createWriteStream('secret.enc'));

二、背压(Backpressure)机制

什么是背压?

定义:当数据生产速度 > 消费速度时,未处理的数据会在内存中堆积,导致内存溢出。背压机制通过暂停数据生产来防止这个问题。

问题场景

javascript
const fs = require('fs');

// ❌ 错误:没有背压处理
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  writable.write(chunk); // 忽略返回值,可能导致内存堆积
});

内存堆积示例

javascript
// 快速读取 + 慢速写入 = 内存爆炸
const readable = fs.createReadStream('1GB-file.txt', {
  highWaterMark: 64 * 1024 // 每次读 64KB
});

const writable = fs.createWriteStream('output.txt', {
  highWaterMark: 16 * 1024 // 缓冲区只有 16KB
});

readable.on('data', (chunk) => {
  // 写入速度跟不上读取速度
  // 数据会堆积在 writable 的内部缓冲区
  // 最终导致内存占用不断增长
  writable.write(chunk);
});

背压的表现

javascript
const writable = fs.createWriteStream('output.txt');

for (let i = 0; i < 1000000; i++) {
  const canWrite = writable.write(`Line ${i}\n`);
  
  if (!canWrite) {
    console.log(`第 ${i} 次写入时触发背压`);
    // 此时 writable 的内部缓冲区已满
    // 继续写入会导致数据堆积在内存中
    break;
  }
}

pipe() 如何处理背压

pipe() 自动处理背压

javascript
const fs = require('fs');

// ✅ 正确:pipe() 自动处理背压
fs.createReadStream('large-file.txt')
  .pipe(fs.createWriteStream('output.txt'));

// pipe() 内部实现(简化版)
Readable.prototype.pipe = function(dest) {
  const src = this;
  
  src.on('data', (chunk) => {
    const canWrite = dest.write(chunk);
    
    if (!canWrite) {
      // 写入缓冲区满,暂停读取
      src.pause();
    }
  });
  
  dest.on('drain', () => {
    // 写入缓冲区清空,恢复读取
    src.resume();
  });
  
  src.on('end', () => {
    dest.end();
  });
  
  return dest;
};

pipe() 的优势

javascript
// 自动处理背压
// 自动处理错误
// 自动清理资源
// 支持链式调用

fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'password'))
  .pipe(fs.createWriteStream('output.enc.gz'))
  .on('finish', () => {
    console.log('处理完成');
  });

pipeline() 更好的错误处理

javascript
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

// pipeline 会自动处理背压和错误
await pipelineAsync(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.gz')
);

// 任何流发生错误都会自动清理所有流
pipeline(
  fs.createReadStream('input.txt'),
  transform1,
  transform2,
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Pipeline 失败:', err);
    } else {
      console.log('Pipeline 成功');
    }
  }
);

手动处理背压

监听 write() 返回值

javascript
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  
  if (!canContinue) {
    // 缓冲区满,暂停读取
    readable.pause();
    console.log('暂停读取,等待 drain');
  }
});

writable.on('drain', () => {
  // 缓冲区清空,恢复读取
  console.log('恢复读取');
  readable.resume();
});

readable.on('end', () => {
  writable.end();
});

完整的背压处理示例

javascript
class BackpressureHandler {
  constructor(readable, writable) {
    this.readable = readable;
    this.writable = writable;
    this.isPaused = false;
  }
  
  start() {
    this.readable.on('data', (chunk) => {
      const canWrite = this.writable.write(chunk);
      
      if (!canWrite && !this.isPaused) {
        this.isPaused = true;
        this.readable.pause();
        console.log('触发背压,暂停读取');
      }
    });
    
    this.writable.on('drain', () => {
      if (this.isPaused) {
        this.isPaused = false;
        this.readable.resume();
        console.log('背压解除,恢复读取');
      }
    });
    
    this.readable.on('end', () => {
      this.writable.end();
    });
    
    this.readable.on('error', (err) => {
      console.error('读取错误:', err);
      this.writable.destroy(err);
    });
    
    this.writable.on('error', (err) => {
      console.error('写入错误:', err);
      this.readable.destroy(err);
    });
  }
}

const handler = new BackpressureHandler(
  fs.createReadStream('input.txt'),
  fs.createWriteStream('output.txt')
);
handler.start();

三、自定义 Stream 实现

自定义 Readable Stream

基本实现

javascript
const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max, options) {
    super(options);
    this.max = max;
    this.current = 0;
  }
  
  _read(size) {
    // 当需要数据时调用
    if (this.current < this.max) {
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      // 推送 null 表示流结束
      this.push(null);
    }
  }
}

const counter = new CounterStream(10);
counter.pipe(process.stdout);

异步数据源

javascript
class AsyncDataStream extends Readable {
  constructor(options) {
    super(options);
    this.index = 0;
  }
  
  async _read(size) {
    try {
      // 模拟异步获取数据
      const data = await this.fetchData(this.index);
      
      if (data) {
        this.push(data);
        this.index++;
      } else {
        this.push(null);
      }
    } catch (err) {
      this.destroy(err);
    }
  }
  
  async fetchData(index) {
    // 模拟 API 调用
    await new Promise(resolve => setTimeout(resolve, 100));
    return index < 10 ? `Data ${index}\n` : null;
  }
}

对象模式 Readable

javascript
class ObjectStream extends Readable {
  constructor(data, options) {
    super({ ...options, objectMode: true });
    this.data = data;
    this.index = 0;
  }
  
  _read(size) {
    if (this.index < this.data.length) {
      // 推送对象而不是 Buffer
      this.push(this.data[this.index]);
      this.index++;
    } else {
      this.push(null);
    }
  }
}

const stream = new ObjectStream([
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' }
]);

stream.on('data', (obj) => {
  console.log(obj); // { id: 1, name: 'Alice' }
});

自定义 Writable Stream

基本实现

javascript
const { Writable } = require('stream');

class ConsoleStream extends Writable {
  _write(chunk, encoding, callback) {
    // 处理数据块
    console.log(`[LOG] ${chunk.toString()}`);
    
    // 调用 callback 表示写入完成
    callback();
  }
}

const logger = new ConsoleStream();
logger.write('Hello World\n');
logger.end();

批量写入优化

javascript
class BatchWriter extends Writable {
  constructor(options) {
    super(options);
    this.batch = [];
    this.batchSize = 100;
  }
  
  _write(chunk, encoding, callback) {
    this.batch.push(chunk.toString());
    
    if (this.batch.length >= this.batchSize) {
      this.flush();
    }
    
    callback();
  }
  
  _final(callback) {
    // 流结束时调用,写入剩余数据
    this.flush();
    callback();
  }
  
  flush() {
    if (this.batch.length > 0) {
      console.log(`批量写入 ${this.batch.length} 条记录`);
      // 实际写入操作
      this.batch = [];
    }
  }
}

数据库写入流

javascript
class DatabaseWriter extends Writable {
  constructor(db, table, options) {
    super({ ...options, objectMode: true });
    this.db = db;
    this.table = table;
  }
  
  async _write(record, encoding, callback) {
    try {
      await this.db.insert(this.table, record);
      callback();
    } catch (err) {
      callback(err);
    }
  }
  
  async _writev(chunks, callback) {
    // 批量写入优化
    try {
      const records = chunks.map(chunk => chunk.chunk);
      await this.db.batchInsert(this.table, records);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

自定义 Transform Stream

CSV 转 JSON 转换器

javascript
const { Transform } = require('stream');

class CSVToJSON extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();
    
    for (const line of lines) {
      if (!this.headers) {
        this.headers = line.split(',');
      } else {
        const values = line.split(',');
        const obj = {};
        this.headers.forEach((header, i) => {
          obj[header] = values[i];
        });
        this.push(obj);
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    if (this.buffer && this.headers) {
      const values = this.buffer.split(',');
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header] = values[i];
      });
      this.push(obj);
    }
    callback();
  }
}

// 使用
fs.createReadStream('data.csv')
  .pipe(new CSVToJSON())
  .on('data', (obj) => {
    console.log(obj); // { name: 'Alice', age: '30' }
  });

数据验证转换流

javascript
class ValidatorStream extends Transform {
  constructor(schema, options) {
    super({ ...options, objectMode: true });
    this.schema = schema;
  }
  
  _transform(data, encoding, callback) {
    try {
      // 验证数据
      const validated = this.validate(data);
      this.push(validated);
      callback();
    } catch (err) {
      // 可以选择跳过无效数据或报错
      this.emit('invalid', { data, error: err });
      callback(); // 继续处理下一条
    }
  }
  
  validate(data) {
    // 简单验证逻辑
    if (typeof data.age !== 'number' || data.age < 0) {
      throw new Error('Invalid age');
    }
    return data;
  }
}

压缩统计流

javascript
class CompressionStats extends Transform {
  constructor(options) {
    super(options);
    this.bytesIn = 0;
    this.bytesOut = 0;
  }
  
  _transform(chunk, encoding, callback) {
    this.bytesIn += chunk.length;
    
    // 实际压缩(这里简化)
    const compressed = zlib.gzipSync(chunk);
    this.bytesOut += compressed.length;
    
    this.push(compressed);
    callback();
  }
  
  _flush(callback) {
    const ratio = ((1 - this.bytesOut / this.bytesIn) * 100).toFixed(2);
    console.log(`压缩率: ${ratio}%`);
    console.log(`原始: ${this.bytesIn} 字节`);
    console.log(`压缩后: ${this.bytesOut} 字节`);
    callback();
  }
}

四、Stream 性能优化

缓冲区大小调优

highWaterMark 对性能的影响

javascript
const fs = require('fs');

// 小缓冲区:频繁 I/O,CPU 开销大
const small = fs.createReadStream('large-file.txt', {
  highWaterMark: 1024 // 1KB
});

// 默认缓冲区:平衡性能
const normal = fs.createReadStream('large-file.txt', {
  highWaterMark: 64 * 1024 // 64KB(默认)
});

// 大缓冲区:减少 I/O 次数,但占用更多内存
const large = fs.createReadStream('large-file.txt', {
  highWaterMark: 1024 * 1024 // 1MB
});

性能测试

javascript
async function benchmarkBufferSize(filePath, bufferSize) {
  return new Promise((resolve) => {
    const start = Date.now();
    let chunks = 0;
    
    const readable = fs.createReadStream(filePath, {
      highWaterMark: bufferSize
    });
    
    readable.on('data', () => {
      chunks++;
    });
    
    readable.on('end', () => {
      const duration = Date.now() - start;
      resolve({ bufferSize, duration, chunks });
    });
  });
}

// 测试不同缓冲区大小
const sizes = [1024, 16 * 1024, 64 * 1024, 256 * 1024, 1024 * 1024];

for (const size of sizes) {
  const result = await benchmarkBufferSize('large-file.txt', size);
  console.log(`${size / 1024}KB: ${result.duration}ms, ${result.chunks} chunks`);
}

// 输出示例:
// 1KB: 1250ms, 10000 chunks
// 16KB: 850ms, 625 chunks
// 64KB: 720ms, 156 chunks
// 256KB: 680ms, 39 chunks
// 1024KB: 650ms, 10 chunks

根据场景选择缓冲区大小

javascript
// 网络流:较小缓冲区,降低延迟
const networkStream = net.createConnection({
  highWaterMark: 16 * 1024 // 16KB
});

// 文件流:较大缓冲区,提高吞吐量
const fileStream = fs.createReadStream('file.txt', {
  highWaterMark: 256 * 1024 // 256KB
});

// 对象流:按对象数量设置
const objectStream = new Readable({
  objectMode: true,
  highWaterMark: 100 // 100 个对象
});

对象模式 vs 二进制模式

二进制模式(默认)

javascript
const { Readable } = require('stream');

// 默认处理 Buffer/String
const binaryStream = new Readable({
  read() {
    this.push(Buffer.from('Hello'));
    this.push(null);
  }
});

binaryStream.on('data', (chunk) => {
  console.log(chunk); // <Buffer 48 65 6c 6c 6f>
  console.log(chunk instanceof Buffer); // true
});

对象模式

javascript
// 处理任意 JavaScript 对象
const objectStream = new Readable({
  objectMode: true,
  read() {
    this.push({ id: 1, name: 'Alice' });
    this.push({ id: 2, name: 'Bob' });
    this.push(null);
  }
});

objectStream.on('data', (obj) => {
  console.log(obj); // { id: 1, name: 'Alice' }
  console.log(typeof obj); // 'object'
});

混合使用

javascript
// 二进制输入 → 对象输出
class JSONParser extends Transform {
  constructor() {
    super({
      readableObjectMode: true,  // 输出对象
      writableObjectMode: false  // 输入二进制
    });
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();
    
    for (const line of lines) {
      try {
        this.push(JSON.parse(line));
      } catch (err) {
        // 忽略无效 JSON
      }
    }
    
    callback();
  }
}

fs.createReadStream('data.jsonl') // 二进制
  .pipe(new JSONParser())          // 输出对象
  .on('data', (obj) => {
    console.log(obj); // JavaScript 对象
  });

性能对比

javascript
// 对象模式:highWaterMark 表示对象数量
const objStream = new Readable({
  objectMode: true,
  highWaterMark: 16 // 缓冲 16 个对象
});

// 二进制模式:highWaterMark 表示字节数
const binStream = new Readable({
  objectMode: false,
  highWaterMark: 16 * 1024 // 缓冲 16KB
});

// 对象模式适合:
// - 数据库记录流
// - JSON 数据处理
// - 业务对象传输

// 二进制模式适合:
// - 文件 I/O
// - 网络传输
// - 大数据处理

面试高频题

1. Stream 的四种类型分别是什么?

四种类型

  1. Readable Stream(可读流)

    • 用于读取数据
    • 示例:fs.createReadStream()http.IncomingMessage
    • 核心方法:read()pipe()
    • 两种模式:流动模式(flowing)和暂停模式(paused)
  2. Writable Stream(可写流)

    • 用于写入数据
    • 示例:fs.createWriteStream()http.ServerResponse
    • 核心方法:write()end()
    • 关键事件:drain(缓冲区清空)、finish(写入完成)
  3. Duplex Stream(双工流)

    • 同时可读可写,读写操作独立
    • 示例:net.Socket、TCP 连接
    • 特点:有独立的读写缓冲区
  4. Transform Stream(转换流)

    • 特殊的 Duplex Stream,输出是输入的转换
    • 示例:zlib.createGzip()crypto.createCipher()
    • 核心方法:_transform()_flush()

关系图

Readable  Writable
    \      /
     Duplex
        |
    Transform

2. 什么是背压?如何处理?

背压定义: 当数据生产速度大于消费速度时,未处理的数据会在内存中堆积,导致内存溢出。背压机制通过暂停数据生产来防止这个问题。

产生原因

javascript
// 快速读取 + 慢速写入 = 内存堆积
const readable = fs.createReadStream('large-file.txt'); // 快
const writable = fs.createWriteStream('slow-device.txt'); // 慢

readable.on('data', chunk => {
  writable.write(chunk); // 忽略返回值,危险!
});

处理方法

  1. 使用 pipe()(推荐)
javascript
readable.pipe(writable); // 自动处理背压
  1. 使用 pipeline()(更好的错误处理)
javascript
const { pipeline } = require('stream');
pipeline(readable, transform, writable, (err) => {
  if (err) console.error('失败:', err);
});
  1. 手动处理
javascript
readable.on('data', chunk => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    readable.pause(); // 暂停读取
  }
});

writable.on('drain', () => {
  readable.resume(); // 恢复读取
});

关键点

  • write() 返回 false 表示缓冲区满
  • drain 事件表示缓冲区清空
  • pipe() 自动调用 pause()/resume()

3. pipe() 方法的实现原理?

简化实现

javascript
Readable.prototype.pipe = function(dest, options) {
  const src = this;
  
  // 1. 监听 data 事件,写入目标流
  src.on('data', (chunk) => {
    const canWrite = dest.write(chunk);
    
    // 2. 处理背压:写入缓冲区满时暂停读取
    if (!canWrite) {
      src.pause();
    }
  });
  
  // 3. 监听 drain 事件,恢复读取
  dest.on('drain', () => {
    src.resume();
  });
  
  // 4. 数据读取完毕,关闭目标流
  src.on('end', () => {
    if (options?.end !== false) {
      dest.end();
    }
  });
  
  // 5. 错误处理
  src.on('error', (err) => {
    dest.destroy(err);
  });
  
  // 6. 返回目标流,支持链式调用
  return dest;
};

核心机制

  1. 自动切换到流动模式:监听 data 事件
  2. 背压处理write() 返回 falsepause()
  3. 恢复流动drain 事件时 resume()
  4. 资源清理end 事件时关闭目标流
  5. 链式调用:返回目标流

使用示例

javascript
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'pwd'))
  .pipe(fs.createWriteStream('output.enc.gz'));

4. 如何实现一个自定义的 Transform Stream?

基本步骤

javascript
const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // 初始化状态
  }
  
  // 1. 实现 _transform 方法(必须)
  _transform(chunk, encoding, callback) {
    try {
      // 处理数据
      const transformed = this.process(chunk);
      
      // 推送转换后的数据
      this.push(transformed);
      
      // 调用 callback 表示完成
      callback();
    } catch (err) {
      callback(err);
    }
  }
  
  // 2. 实现 _flush 方法(可选)
  _flush(callback) {
    // 流结束时处理剩余数据
    if (this.buffer) {
      this.push(this.buffer);
    }
    callback();
  }
  
  process(chunk) {
    // 实际转换逻辑
    return chunk;
  }
}

实际案例:大写转换

javascript
class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// 使用
process.stdin
  .pipe(new UpperCaseTransform())
  .pipe(process.stdout);

关键点

  1. 继承 Transform 类
  2. 实现 _transform(chunk, encoding, callback)
    • chunk: 输入数据
    • encoding: 编码类型
    • callback: 完成回调,callback(err)callback()
  3. 使用 this.push() 输出数据
  4. 可选实现 _flush(callback) 处理剩余数据
  5. 支持对象模式super({ objectMode: true })

5. Stream 和一次性读取文件的性能差异?

内存占用对比

javascript
// 一次性读取 1GB 文件
const data = fs.readFileSync('1GB-file.txt');
// 内存占用:~1GB

// 流式读取 1GB 文件
fs.createReadStream('1GB-file.txt')
  .pipe(fs.createWriteStream('output.txt'));
// 内存占用:~64KB(缓冲区大小)

性能测试

javascript
// 测试 100MB 文件

// 方法 1:一次性读取
console.time('readFile');
const data = fs.readFileSync('100MB.txt');
fs.writeFileSync('output1.txt', data);
console.timeEnd('readFile');
// 时间:~500ms
// 内存峰值:~100MB

// 方法 2:Stream
console.time('stream');
fs.createReadStream('100MB.txt')
  .pipe(fs.createWriteStream('output2.txt'))
  .on('finish', () => {
    console.timeEnd('stream');
  });
// 时间:~550ms
// 内存峰值:~128KB

对比总结

特性一次性读取Stream
内存占用文件大小固定缓冲区(~64KB)
速度略快略慢
大文件可能 OOM无限大小
适用场景小文件(<10MB)大文件、实时处理
处理时机全部读取后边读边处理

使用建议

  • 小文件(<10MB):使用 readFile()/readFileSync()
  • 大文件(>10MB):使用 Stream
  • 实时处理:必须使用 Stream
  • 网络传输:必须使用 Stream

Stream 的优势

  1. 内存友好:固定缓冲区大小
  2. 无文件大小限制:可处理 GB 级文件
  3. 边读边处理:提高响应速度
  4. 组合性强:通过 pipe 链接多个操作