Skip to content

Latest commit

 

History

History
140 lines (115 loc) · 4.89 KB

eventloop.md

File metadata and controls

140 lines (115 loc) · 4.89 KB

EventLoop

EventLoop作为Netty的Reactor线程模型,极大提高了Netty应用的性能。

常用实现类NioEventLoop类图图下

服务端代码通常如下,会使用一个BossEventLoopGroup和一个WorkerEventLoopGroup,new NioEventLoopGroup()的时候会创建多个EventLoop

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(xxxHandler);
                }
            });

    ChannelFuture f = b.bind(port).sync();
    f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

ServerBootStrap会通过以下代码,添加一个叫做ServerBootstrapAcceptor的ChannelHandler,在“Accept”到客户端连接的时候,会创建一个SocketChannel用于和客户端交互,并在调用ServerBootstrapAcceptor的channelRead()的时候作为消息传入,在channelRead方法中会调用childGroup(WorkerEventLoopGroup)的register(),childGroup.register()又会调用next().register(),WorkerEventLoopGroup的next()会返回一个EventLoop(在创建WorkerEventLoopGroup的时候,就创建了多个EventLoop,next()会返回这些EventLoop中的一个,当创建的SocketChannel太多的时候,可能会多个SocketChannel对应一个EventLoop),该EventLoop的register()会调用创建的SocketChannel的register()设置eventLoop为该SocketChannel的eventLoop,然后eventLoop用于该SocketChannel的IO事件处理。

ServerBootStrap.java

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

ServerBootstrapAcceptor.java

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

AbstractChannel.java

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}