Node.js Stream 流处理:处理大文件的正确姿势
全面解析 Node.js Stream 流处理机制,掌握 Readable、Writable、Transform 流的用法、背压机制与 pipeline API。
有一次需要在 Node.js 里处理一个 3GB 的日志文件,我直接用 fs.readFile 一次性读进内存,结果 Node 进程直接 OOM(Out of Memory)崩掉了。后来改用 Stream,内存占用稳定在 50MB 以下,处理速度还快了一倍。那次之后,Stream 成了我处理大数据量任务的首选方案。
Stream 是 Node.js 里最强大但也最容易被忽视的功能之一。很多前端转 Node 的同学上来就 readFile + writeFile,小文件没问题,等遇到几百 MB 的 CSV 导出、视频转码、大批量数据迁移时就傻眼了。Stream 的学习曲线确实比较陡,特别是背压(backpressure)这个概念,但一旦搞懂了,你会发现它才是处理 I/O 的正确抽象。
1. 为什么需要 Stream?
传统的文件读写方式是「一次性读取、一次性写入」,整个文件内容必须先全部加载到内存中。当文件很大时,这种方式的问题是致命的。
Buffer 模式(一次性读取)
const data = fs.readFileSync('big.csv')
// 整个文件加载到内存
// 2GB 文件 = 2GB 内存占用
process(data)
fs.writeFileSync('out.csv', result)- • 内存占用 = 文件大小
- • 必须等完全读取才能开始处理
- • 大文件直接 OOM
Stream 模式(分块处理)
const read = fs.createReadStream('big.csv')
const write = fs.createWriteStream('out.csv')
// 每次只处理一小块数据
read.pipe(transformStream).pipe(write)- • 内存占用恒定(通常几十 MB)
- • 边读边处理边写,时间重叠
- • 理论上可处理任意大小的文件
2. 四种 Stream 类型详解
Node.js 中有四种基本的 Stream 类型,理解它们各自的职责是用好 Stream 的前提。
Readable Stream(可读流)
数据的来源。常见的有:fs.createReadStream、http.IncomingMessage(请求体)、process.stdin。
const { Readable } = require('stream')
// 自定义可读流
const myReadable = new Readable({
read(size) {
this.push('hello ')
this.push('world')
this.push(null) // 表示数据结束
}
})
myReadable.on('data', chunk => console.log(chunk.toString()))
myReadable.on('end', () => console.log('读取完毕'))Writable Stream(可写流)
数据的去向。常见的有:fs.createWriteStream、http.ServerResponse(响应体)、process.stdout。
const writable = fs.createWriteStream('output.txt')
writable.write('第一行数据\n')
writable.write('第二行数据\n')
writable.end('最后一行') // end 后不能再写入
writable.on('finish', () => console.log('写入完成'))Transform Stream(转换流)
既可读又可写,在读取和写入之间对数据进行转换。常见的有:zlib.createGzip、crypto.createCipheriv。
const { Transform } = require('stream')
// 将文本转为大写的转换流
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase())
callback()
}
})
process.stdin.pipe(upperCase).pipe(process.stdout)Duplex Stream(双工流)
读和写是独立的两个通道(不像 Transform 那样有转换关系)。典型例子是 net.Socket(TCP 套接字),可以同时收发数据。
3. 背压(Backpressure):被忽略的关键概念
背压是 Stream 中最重要但也最容易被忽略的概念。当数据生产速度大于消费速度时(比如读取速度远快于写入速度),如果不处理背压,数据就会在内存中堆积,最终导致内存溢出。
错误示例:忽略背压
// 危险!没有处理背压
const readable = fs.createReadStream('huge-file.dat')
const writable = fs.createWriteStream('output.dat')
readable.on('data', (chunk) => {
// write() 返回 false 表示内部缓冲区已满
// 但这里完全忽略了返回值
writable.write(chunk)
})正确做法一:手动处理背压
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk)
if (!canContinue) {
// 写入缓冲区满了,暂停读取
readable.pause()
// 等缓冲区排空后恢复读取
writable.once('drain', () => readable.resume())
}
})正确做法二:直接用 pipe(自动处理背压)
// pipe 内部已经实现了背压处理
readable.pipe(writable)4. Pipeline API:现代 Stream 的最佳实践
stream.pipeline() 是 Node.js 10 引入的 API,解决了 pipe() 的两个痛点:错误处理不完善和流没有正确清理。
pipe 的问题
// 如果中间某个流出错
// 其他流不会被自动销毁
// 可能导致内存泄漏
readStream
.pipe(gzip)
.pipe(writeStream)pipeline 的优势
const { pipeline } = require('stream/promises')
await pipeline(
fs.createReadStream('input.log'),
zlib.createGzip(),
fs.createWriteStream('input.log.gz')
)
// 自动处理错误和流清理一个完整的实战例子——逐行处理大型 CSV 文件并输出 JSON:
const { pipeline } = require('stream/promises')
const { Transform } = require('stream')
const { createReadStream, createWriteStream } = require('fs')
const readline = require('readline')
async function csvToJson(inputPath, outputPath) {
const rl = readline.createInterface({
input: createReadStream(inputPath),
crlfDelay: Infinity
})
const output = createWriteStream(outputPath)
let headers = null
let first = true
output.write('[\n')
for await (const line of rl) {
const fields = line.split(',')
if (!headers) {
headers = fields
continue
}
const obj = {}
headers.forEach((h, i) => obj[h.trim()] = fields[i]?.trim())
if (!first) output.write(',\n')
output.write(' ' + JSON.stringify(obj))
first = false
}
output.write('\n]\n')
output.end()
}
csvToJson('data.csv', 'data.json')5. 实战场景与性能对比
以下是几个典型的 Stream 应用场景和需要注意的性能细节:
场景一:HTTP 文件下载
// 不要先下载完再发送,直接 pipe
app.get('/download/:file', (req, res) => {
const filePath = path.join(__dirname, 'files', req.params.file)
res.setHeader('Content-Disposition', 'attachment')
fs.createReadStream(filePath).pipe(res)
// 用户立即开始接收数据,无需等待服务端读完整个文件
})场景二:实时日志处理
// 过滤 ERROR 级别日志并压缩归档
await pipeline(
fs.createReadStream('app.log'),
new Transform({
transform(chunk, enc, cb) {
const lines = chunk.toString().split('\n')
const errors = lines.filter(l => l.includes('ERROR'))
cb(null, errors.join('\n') + '\n')
}
}),
zlib.createGzip(),
fs.createWriteStream('errors.log.gz')
)场景三:highWaterMark 调优
highWaterMark 控制流的内部缓冲区大小,默认为 16KB(对象模式下为 16 个对象)。根据场景调整可以显著影响性能:
- • 大文件顺序读写:增大到 64KB-1MB,减少系统调用次数
- • 网络流:保持默认或适当减小,降低内存占用
- • 实时处理:减小缓冲区,降低数据延迟
在线数据处理
日常开发中经常需要处理 JSON、CSV 等数据格式。试试 OneKit 的 JSON 格式化工具,支持格式化、压缩、校验等功能,所有处理在浏览器本地完成,无需担心数据安全。