Node.js中你不可不精的Stream(流)
一、什么是Stream(流)
流(stream)在Node.js中是处理流数据的抽象接口(abstractinterface)。stream模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。例如,HTTP请求和process.stdout就都是流的实例。
流可以是可读的、可写的,或是可读写的。注意,所有的流都是EventEmitter的实例。
二、流的类型
Node.js中有四种基本的流类型:
- Readable-可读的流(例如fs.createReadStream())。
- Writable-可写的流(例如fs.createWriteStream())。
- Duplex-可读写的流(双工流)(例如net.Socket)。
- Transform-在读写过程中可以修改和变换数据的Duplex流(例如zlib.createDeflate())。
varStream=require('stream')//stream模块引入方式
varReadable=Stream.Readable//可读的流
varWritable=Stream.Writable//可写的流
varDuplex=Stream.Duplex//可读写的流
varTransform=Stream.Transform//在读写过程中可以修改和变换数据的Duplex流
Node.js中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。例如在fs.createReadStream()和fs.createWriteStream()的源码实现里,都调用了Stream模块提供的抽象接口来实现对流数据的操作。
三、为什么使用Stream?
我们通过两个例子,了解一下为什么要使用Stream。
Exp1:
下面是一个读取文件内容的例子:
constfs=require('fs')
fs.readFile(file,function(err,content){//读出来的content是Buffer
console.log(content)
console.log(content.toString())
})
但如果文件内容较大,譬如在500M时,执行上述代码的输出为:
buffer.js:382 thrownewError('toStringfailed'); ^ Error:toStringfailed atBuffer.toString(buffer.js:382:11)
报错的原因是content这个Buffer对象的长度过大,导致toString方法失败。
可见,这种一次获取全部内容的做法,不适合操作大文件。
可以考虑使用流来读取文件内容。
varfs=require('fs')
fs.createReadStream(bigFile).pipe(process.stdout)
fs.createReadStream创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。
执行上面代码时,流会逐次调用fs.read(ReadStream这个类的源码里有一个_read方法,这个_read方法在内部调用了fs.read来实现对文件的读取),将文件中的内容分批取出传给下游。
在文件看来,它的内容被分块地连续取走了。
在下游看来,它收到的是一个先后到达的数据序列。
如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。
在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。
这种情况就像是用水管去取池子中的水。
每当用掉一点水,水管便会从池子中再取出一点。
无论水池有多大,都只存储了与水管容积等量的水。
Exp2:
下面是一个在线看视频的例子,假定我们通过HTTP请求返回视频内容给用户
consthttp=require('http');
constfs=require('fs');
http.createServer((req,res)=>{
fs.readFile(videoPath,(err,data)=>{
res.end(data);
});
}).listen(8080);
但这样有两个明显的问题
- 视频文件需要全部读取完,才能返回给用户,这样等待时间会很长。
- 视频文件一次全放入内存中,内存吃不消。
用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了HTTP协议的Transfer-Encoding:chunked分段传输特性),用户体验得到优化,同时对内存的开销明显下降。
consthttp=require('http');
constfs=require('fs');
http.createServer((req,res)=>{
fs.createReadStream(videoPath).pipe(res);
}).listen(8080);
通过上述两个例子,我们知道,在大数据情况下必须使用流式处理。
四、可读流(ReadableStream)
可读流(Readablestreams)是对提供数据的源头(source)的抽象。
常见的可读流:
- HTTPresponses,ontheclient
- HTTPrequests,ontheserver
- fsreadstreams
- TCPsockets//sockets是一个双工流,即可读可写的流
- process.stdin//标准输入
所有的ReadableStream都实现了stream.Readable类定义的接口。
可读流的两种模式(flowing和paused)
- 在flowing模式下,可读流自动从系统底层读取数据,并通过EventEmitter接口的事件尽快将数据提供给应用(所有的流都是EventEmitter的实例)。
- 在paused模式下,必须显式调用stream.read()方法来从流中读取数据片段。
创建流的Readable流,默认是非流动模式(paused模式),默认不会读取数据。所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:
- 监听'data'事件
- 调用stream.resume()方法
- 调用stream.pipe()方法将数据发送到Writable
fs.createReadStream(path[,options])源码实现
//文件名ReadStream.js
letfs=require('fs');//读取文件
letEventEmitter=require('events');
classReadStreamextendsEventEmitter{//流操作都是基于事件的
constructor(path,options={}){
super();
//需要的参数
this.path=path;//读取文件的路径
this.highWaterMark=options.highWaterMark||64*1024;//缓冲区大小,默认64KB
this.autoClose=options.autoClose||true;//是否需要自动关闭文件描述符,默认为true
this.start=options.start||0;//options可以包括start和end值,使其可以从文件读取一定范围的字节而不是整个文件
this.pos=this.start;//从文件的那个位置开始读取内容,pos会随着读取的位置而改变
this.end=options.end||null;//null表示没传递
this.encoding=options.encoding||null;
this.flags=options.flags||'r';//以何种方式操作文件
//参数的问题
this.flowing=null;//默认为非流动模式
//建一个buffer存放读出来的数据
this.buffer=Buffer.alloc(this.highWaterMark);
this.open();
//{newListener:[fn]}
//次方法默认同步调用的
this.on('newListener',(type)=>{//等待着它监听data事件
if(type==='data'){//当监听到data事件时,把流设置为流动模式
this.flowing=true;
this.read();//开始读取客户已经监听了data事件
}
})
}
pause(){//将流从flowing模式切换为paused模式
this.flowing=false;
}
resume(){//将流从paused模式切换为flowing模式
this.flowing=true;
this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
}
read(){//默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
if(typeofthis.fd!=='number'){//如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
returnthis.once('open',()=>this.read());//等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
}
//当获取到fd时开始读取文件了
//第一次应该读2个第二次应该读2个
//第二次pos的值是4end是4
//读取文件里一共4有个数为1234,我们读取里面的1234
lethowMuchToRead=this.end?Math.min(this.end-this.pos+1,this.highWaterMark):this.highWaterMark;//规定每次读取多少个字节
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(error,byteRead)=>{//byteRead为真实的读到了几个字节的内容
//读取完毕
this.pos+=byteRead;//读出来两个,pos位置就往后移两位
//this.buffer默认就是三个
letb=this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead);//对读出来的内容进行编码
this.emit('data',b);//触发data事件,将读到的内容输出给用户
if((byteRead===this.highWaterMark)&&this.flowing){
returnthis.read();//继续读
}
//这里就是没有更多的逻辑了
if(byteRead{
//如果文件打开过了那就关闭文件并且触发close事件
this.emit('close');
});
}
open(){
fs.open(this.path,this.flags,(err,fd)=>{//fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
if(err){
if(this.autoClose){//如果需要自动关闭我再去销毁fd
this.destroy();//销毁(关闭文件,触发关闭事件)
}
this.emit('error',err);//如果有错误触发error事件
return;
}
this.fd=fd;//保存文件描述符
this.emit('open',this.fd);//文件被打开了,触发文件被打开的方法
});
}
pipe(dest){//管道流的实现pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
this.on('data',(data)=>{
letflag=dest.write(data);
if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
this.pause();//已经不能继续写了,等他写完了再恢复
}
});
dest.on('drain',()=>{//当读取缓存区清空后
console.log('写一下停一下')
this.resume();//继续往dest写入数据
});
}
}
module.exports=ReadStream;//导出可读流
使用fs.createReadStream()
//流:有序的有方向的,可以自己控制速率
//读:读是将内容读取到内存中
//写:写是将内存或者文件的内容写入到文件内
//读取的时候默认读默认一次读取64k,encoding读取出来的内容默认都是buffer
//letfs=require('fs');
//letrs=fs.createReadStream({...});//原生实现可读流
letReadStream=require('./ReadStream');
letrs=newReadStream('./2.txt',{
highWaterMark:3,//字节
flags:'r',//读文件
autoClose:true,//默认读取完毕后自动关闭文件描述符
start:0,
//end:3,//流是闭合区间包start也包end
encoding:'utf8'
});
//默认创建一个流是非流动模式(上述源码中有写的),默认不会读取数据
//如果我们需要接收数据,那我们要监听data事件,这样数据会自动的流出来
rs.on('error',function(err){//通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
console.log(err)
});
rs.on('open',function(){//文件被打开了,获取到了fd。内部会自动的触发这个事件rs.emit('data');
console.log('文件打开了');
});
rs.on('data',function(data){//有数据流出来了
console.log(data);
rs.pause();//暂停触发on('data')事件,将流动模式又转化成了非流动模式
});
setTimeout(()=>{rs.resume()},3000);//三秒钟之后再将非流动模式转化为流动模式
rs.on('end',function(){//读取完毕
console.log('读取完毕了');
});
rs.on('close',function(){//close事件将在流或其底层资源(比如一个文件)关闭后触发。close事件触发后,该流将不会再触发任何事件。
//console.log('关闭')
});
四、可写流(WritableStream)
可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者TCP、HTTP等网络响应。
常见的可写流:
- HTTPrequests,ontheclient
- HTTPresponses,ontheserver
- fswritestreams
- zlibstreams
- cryptostreams
- TCPsockets
- childprocessstdin
- process.stdout,process.stderr
所有Writable流都实现了stream.Writable类定义的接口。
可写流的使用
调用可写流实例的write()方法就可以把数据写入可写流
constfs=require('fs');
constrs=fs.createReadStream(sourcePath);
constws=fs.createWriteStream(destPath);
rs.setEncoding('utf-8');//设置编码格式
rs.on('data',chunk=>{
ws.write(chunk);//写入数据
});
监听了可读流的data事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的write()方法,这样数据就被写入了可写流抽象的设备destPath中。
write()方法有三个参数
- chunk{String|Buffer},表示要写入的数据
- encoding当写入的数据是字符串的时候可以设置编码
- callback数据被写入之后的回调函数
drain事件
如果调用stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。
constfs=require('fs');
constrs=fs.createReadStream(sourcePath);
constws=fs.createWriteStream(destPath);
rs.setEncoding('utf-8');//设置编码格式
rs.on('data',chunk=>{
letflag=ws.write(chunk);//写入数据
if(!flag){//如果缓存区已满暂停读取
rs.pause();
}
});
ws.on('drain',()=>{
rs.resume();//缓存区已清空继续读取写入
});
fs.createWriteStream(path[,options])源码实现
//文件WriteStream.js
letfs=require('fs');
letEventEmitter=require('events');
classWriteStreamextendsEventEmitter{
constructor(path,options={}){
super();
this.path=path;
this.flags=options.flags||'w';
this.encoding=options.encoding||'utf8';
this.start=options.start||0;
this.pos=this.start;
this.mode=options.mode||0o666;
this.autoClose=options.autoClose||true;
this.highWaterMark=options.highWaterMark||16*1024;
this.open();//fd异步的//触发一个open事件,当触发open事件后fd肯定就存在了
//写文件的时候需要的参数有哪些
//第一次写入是真的往文件里写
this.writing=false;//默认第一次就不是正在写入
//用简单的数组来模拟一下缓存
this.cache=[];
//维护一个变量,表示缓存的长度
this.len=0;
//是否触发drain事件
this.needDrain=false;
}
clearBuffer(){
letbuffer=this.cache.shift();
if(buffer){//如果缓存里有
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer());
}else{//如果缓存里没有了
if(this.needDrain){//需要触发drain事件
this.writing=false;//告诉下次直接写就可以了不需要写到内存中了
this.needDrain=false;
this.emit('drain');
}
}
}
_write(chunk,encoding,clearBuffer){//因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
if(typeofthis.fd!='number'){
returnthis.once('open',()=>this._write(chunk,encoding,clearBuffer));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.pos+=byteWritten;
this.len-=byteWritten;//每次写入后就要在内存中减少一下
clearBuffer();//第一次就写完了
})
}
write(chunk,encoding=this.encoding){//客户调用的是write方法去写入内容
//要判断chunk必须是buffer或者字符串为了统一,如果传递的是字符串也要转成buffer
chunk=Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
this.len+=chunk.length;//维护缓存的长度3
letret=this.lenthis.clearBuffer());//专门实现写的方法
}
returnret;//能不能继续写了,false表示下次的写的时候就要占用更多内存了
}
destroy(){
if(typeofthis.fd!='number'){
this.emit('close');
}else{
fs.close(this.fd,()=>{
this.emit('close');
});
}
}
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){
this.destroy();//如果自动关闭就销毁文件描述符
}
return;
}
this.fd=fd;
this.emit('open',this.fd);
});
}
}
module.exports=WriteStream;
使用fs.createWriteStream()
//可写流有缓存区的概念
//1.第一次写入是真的向文件里写,第二次在写入的时候是放到了缓存区里
//2.写入时会返回一个boolean类型,返回为false时表示缓存区满了,不要再写入了
//3.当内存和正在写入的内容消耗完后,会触发一个drain事件
//letfs=require('fs');
//letrs=fs.createWriteStream({...});//原生实现可写流
letWS=require('./WriteStream')
letws=newWS('./2.txt',{
flags:'w',//写入文件,默认文件不存在会创建
highWaterMark:1,//设置当前缓存区的大小
encoding:'utf8',//文件里存放的都是二进制
start:0,
autoClose:true,//自动关闭文件描述符
mode:0o666,//可读可写
});
//drain的触发时机,只有当highWaterMark填满时,才可能触发drain
//当嘴里的和地下的都吃完了,就会触发drain方法
leti=9;
functionwrite(){
letflag=true;
while(flag&&i>=0){
i--;
flag=ws.write('111');//987//654//321//0
console.log(flag)
}
}
write();
ws.on('drain',function(){
console.log('dry');
write();
});
总结
stream(流)分为可读流(flowingmode和pausedmode)、可写流、可读写流,Node.js提供了多种流对象。例如,HTTP请求和process.stdout就都是流的实例。stream模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。