原文地址: http://www.moye.me/2015/03/29/streaming_in_node/
说到流,就涉及到一个*nix的概念: 管道 ——在*nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin)。
在Node中,流( Stream )的概念与之类似,代表一种数据流可供桥接的能力。
流化的精髓在于 .pipe()方法。可供桥接的能力,在于数据流的两端(上游/下游 或称为 读/写流)以一个 .pipe()方法进行桥接。
伪代码的表现形式为:
//上游.pipe(下游) Readable.pipe(Writable);
这里并不打算讨论所谓的Node v0.4 之前的“经典”流。那么,流分为这么几类(皆为抽象接口:
简单来说:
有没有一丝丝焦虑?别急,做为一个说人话的低级码工,我会把Stream掰开了和您扯一扯的。
Stream类,在 Node.js的源码 里,是这么定义的:
var EE = require('events').EventEmitter; var util = require('util'); util.inherits(Stream, EE); function Stream() { EE.call(this); }
可以看出,本质上,Stream是一个EventEmitter,那意味着它具备事件驱动的功能(.emit/.on...)。众所周知,“Node.js 就是基于V8的事件驱动平台”,实现了事件驱动的流式编程,具备了和Node一样的异步回调的特征。
比如在 Readable 流中,有一个 readable 事件,在一个暂停的只读流中,只要有数据块准备好可读时,它就会被发送给订阅者(Readable 流有哪些呢?express中的 req,ftp或者mutli-form上传组件的req.part,系统中的标准输入 process.stdin等)。有了readable 事件,我们可以做个处理shell 命令输出的分析器之类的工具:
process.stdin.on('readable', function(){ var buf = process.stdin.read(); if(buf){ var data = buf.toString(); // parsing data ... } });
这样调用:
head -10 some.txt | node parser.js
对于 Readable 流,我们还可以订阅它的 data 和 end 事件,以获取数据块并在流枯竭时获得通知,如 经典socket示例 中那样:
req.on('connect', function(res, socket, head) { socket.on('data', function(chunk) { console.log(chunk.toString()); }); socket.on('end', function() { proxy.close(); }); });
需要注意的是,Readable 流有两种状态:flowing mode(激流) 和 pause mode(暂停)。前者根本停不下来,谁被pipe上了就马上不停的给;后者会暂停,直到下游显式的调用 Stream.read() 请求才读取数据块。Readable 流初始化时是 pause mode的。
这两种状态可以互为切换的,其中,
有以下任一行为,pause 转 flowing:
有以下任一行为,flowing 转回 pause:
结合流的异步特性,我可以写出这样的应用:直接将 用户A 的输出桥接到 用户B 的页面上输出:
router.post('/post', function(req, res) { var destination = req.headers['destination']; //发给谁 cache[destionation] = req; //是的,并不返回,所以最好是个ajax请求 });
用户B请求的时候:
router.get('/inbox', function(req, res){ var user = req.headers['user']; cache.find(user, function(err, previousReq){ //找到之前存的req var form = new multiparty.Form(); form.parse(previousReq); // 有文件给我 form.on('part', function (part) { part.pipe(res); //流式大法好:) part.on('error', function (err) { console.log(err); messaging.setRequestDone(uniqueID); return res.end(err); }); }); }); });
更多文章请移步我的blog新地址: http://www.moye.me/