-
Notifications
You must be signed in to change notification settings - Fork 472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Node.js Readable Stream的实现简析 #27
Comments
已收到你的邮件,谢谢!
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Node.js Readable Stream的实现简析
Readable Stream
是对数据源的一种抽象。它提供了从数据源获取数据并缓存,以及将数据提供给数据消费者的能力。接下来分别通过
Readable Stream
的2种模式来学习下可读流是如何获取数据以及将数据提供给消费者的。Flowing模式
在
flowing
模式下,可读流自动从系统的底层读取数据,并通过EventEmitter
接口的事件提供给消费者。如果不是开发者需要自己去实现可读流,大家可使用最为简单的readable.pipe()
方法去消费数据。接下来我们就通过一个简单的实例去具体分析下
flowing
模式下,可读流是如何工作的。首先我们先来看下
Readable
构造函数的实现:在我们创建可读流实例时,传入了一个
read
方法,用以自定义从数据源获取数据的方法,如果是开发者需要自己去实现可读流,那么这个方法一定需要去自定义,否则在程序的运行过程中会报错。ReadableState
构造函数中定义了很多关于可读流的不同阶段的状态值:在上面的例子中,当实例化一个可读流
rs
后,调用可读流实例的pipe
方法。这正式开始了可读流在flowing
模式下从数据源开始获取数据,以及process.stdout
对数据的消费。Node提供的可读流有3种方式可以将初始态
flowing = null
的可读流转化为flowing = true
:data
事件stream.resume()
方法stream.pipe()
方法事实上这3种方式都回归到了一种方式上:
strean.resume()
,通过调用这个方法,将可读流的模式改变为flowing
态。继续回到上面的例子当中,在调用了rs.pipe()
方法后,实际上内部是调用了src.on('data', ondata)
监听data
事件,那么我们就来看下这个方法当中做了哪些工作。可读流监听
data
事件,并调用resume
方法:resume
方法会判断这个可读流是否处于flowing
模式下,同时在内部调用stream.read(0)
开始从数据源中获取数据(其中stream.read()方法根据所接受到的参数会有不同的行为):TODO: 这个地方可说明stream.read(size)方法接收到的不同的参数
这个时候可读流从数据源开始获取数据,调用
this._read(state.highWaterMark)
方法,对应着例子当中实现的read()
方法:在
read
方法当中有一个非常中的方法需要开发者自己去调用,就是stream.push
方法,这个方法即完成从数据源获取数据,并供消费者去调用。在
addChunk
方法中完成对数据的处理,这里需要注意的就是,在flowing
态下,数据被消耗的途径可能还不一样:这2种情况到底使用哪一种还要看开发者的是同步还是异步的去调用
push
方法,对应着state.sync
的状态值。当
push
方法被异步调用时,即state.sync
为false
:这个时候对于从数据源获取到的数据是直接通过触发data
事件以供消费者来使用,而不用存放到缓冲区。然后调用stream.read(0)
方法重复读取数据并供消费者使用。当
push
方法是同步时,即state.sync
为true
:这个时候从数据源获取数据后,就不是直接通过触发data
事件来供消费者直接使用,而是首先上数据缓冲到可读流的缓冲区。这个时候你看代码可能会疑惑,将数据缓存起来后,那么在flowing
模式下,是如何流动起来的呢?事实上在一开始调用resume_
方法时:在
flow
方法内部调用stream.read()
方法取出可读流缓冲区的数据供消费者使用,同时继续调用stream.read(0)
来继续从数据源获取数据。以上就是在flowing模式下,可读流是如何完成从数据源获取数据并提供给消费者使用的大致流程。
paused模式
在
pasued
模式下,消费者如果要获取数据需要手动调用stream.read()
方法去获取数据。举个例子:
通过监听
readable
事件,开始出发可读流从数据源获取数据。在
nReadingNextTick
当中调用self.read(0)
方法后,后面的流程和上面分析的flowing模式的可读流从数据源获取数据的流程相似,最后都要调用addChunk
方法,将数据获取到后推入可读流的缓冲区:一旦有数据被加入到了缓冲区,且
needReadable
(这个字段表示是否需要触发readable
事件用以通知消费者来消费数据)为true
,这个时候会触发readable
告诉消费者有新的数据被push
进了可读流的缓冲区。此外还会调用maybeReadMore
方法,异步的从数据源获取更多的数据:每当可读流有新的数据被推进缓冲区,触发
readable
事件后,消费者通过调用stream.read()
方法来从可读流中获取数据。背压
当数据消费消费数据的速度慢于可写流提供给消费者的数据后会产生背压。
还是通过
pipe
管道来看:当
dest.write(chunk)
返回false
的时候,即代表可读流给可写流提供的数据过快,这个时候调用src.pause
方法,暂停flowing
状态,同步也暂停可写流从数据源获取数据以及向可写流输入数据。这个时候只有当可写流触发drain
事件时,会调用ondrain
来恢复flowing
,同时可读流继续向可写流输入数据。关于可写流的背压可参见关于Writable_stream的源码分析。以上就是通过可读流的2种模式分析了下可读流的内部工作机制。当然还有一些细节处大家有兴趣的话可以阅读相关的源码。
The text was updated successfully, but these errors were encountered: