Skip to content

Latest commit

 

History

History
157 lines (129 loc) · 4.46 KB

netty-channel-pipeline.md

File metadata and controls

157 lines (129 loc) · 4.46 KB

ChannelPipeline

summary

这里从下面几点分析:

  1. ChannelPipeline 什么时候初始化
  2. 如何进行进行流处理

读事件的流程触发点

  1. 首页要明确,读事件是从NioEventLoop#processSelectedKey触发的(NioEventLoop负责所有读写事件的转发)
  2. 然后这个事件被转发到AbstractNioChannel.NioUnsafe这个类
  3. NioServerSocketChannel -> AbstractNioChannel -> AbstractChannel 这个三个类存在继承关系,因此可以在AbstractNioChannel中获取pipeline,pipeline开始进行事件的转发
  4. pipeline从链头部(HeadContext),开始进行读事件的处理
  5. 进入自定义的 ChannelHandler,如SimpleChannelInboundHandler

init

默认实现 DefaultChannelPipeline

DefaultChannelPipeline的初始化

Channel 在初始化的时候,会进行unsafepipeline 的初始化,代码如下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

读事件触发的代码:

DefaultChannelPipeline#fireChannelRead

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    // head 代表这个pipeline链中的第一个,进行读事件的流转
    // head 就是HeadContext
    // msg 是已经读取的原始数据(byte数据)
    AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
    return this;
}

head是在DefaultChannelPipeline初始化的时候生成的,代码如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

headHeadContext

AbstractChannelHandlerContext#invokeChannelRead // 1⃣️

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // 这里的 this 就是 HeadContext
            // 当下次在调这个方法的时候,这个 this 会指向 HeadContext.next
            // 比如 SimpleChannelInboundHandler
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

HeadContext

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 这里 HeadContext 什么都不做,只是把 msg 传递到下个handler
    ctx.fireChannelRead(msg);
}

AbstractChannelHandlerContext#fireChannelRead // 2⃣️

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // findContextInbound() 这个方法从head 开始找下一个context
    // 比如找到了 SimpleChannelInboundHandler
    invokeChannelRead(findContextInbound(), msg);
    return this;
}
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

这里来看SimpleChannelInboundHandler的处理

SimpleChannelInboundHandler#channelRead // 3⃣️

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            // false 不释放msg,把这个msg 给其他handler处理
            release = false;
            // 这里把这个 msg 传递给下一个ChannelHandler
            // 会调用 AbstractChannelHandlerContext#fireChannelRead 找到下一个handler
            // findContextInbound 找到下一个handler 回到了2⃣️步骤
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

AbstractChannelHandlerContext 用来维护这个 pipeline 链

private void addFirst0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}