189 8069 5689

Netty线程模型、Future、Channel总结和源码分析-创新互联

Netty线程模型

可根据需要配置线程模型:单线程Reactor、多线程Reactor、多层线程Reactor

在舟曲等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站设计、网站制作 网站设计制作定制网站设计,公司网站建设,企业网站建设,品牌网站设计,成都全网营销推广,成都外贸网站建设,舟曲网站建设费用合理。

无论几个线程,都通过单一的Acceptor接收客户端请求,可以创建更多的NioEventLoop来处理IO操作。

EventLoop和EventLoopGroup实际继承了Java的ScheduledExecutorService,使其具备了线程池的特性,其线程数量可动态配置。例如配置单线程模型,设置线程数量为1即可。

Future和Promise
Future

Future即异步操作
future操作可以被close,但结果是未知的;调用get可以获取操作结果,但是会被阻塞;isDone可判断是否完成操作。
ChannelFuture是为了获取异步返回结果而设计
可以通过ChannelFutureListener接口获得回调,无需等待get方法返回。

public interface ChannelFutureListener extends GenericFutureListener {
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }

        }
    };
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }

        }
    };
}

连接超时和channel超时配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);

注意:
1、谨慎调用await,可能导致死锁。
2、ChannelFuture超时后如果调用了业务代码重连,而此时IO未超时,将可能导致多条连接并存,设置IO超时时间建议小于业务代码超时时间。

promise

升级版的future,可写可操作(对回调过程)。future好比古代飞鸽传书,只能等鸽子回来或者不回来,不可控;promise就像现代快递员,送快递送一半可以打电话给他叫他不要送了或者中途请他帮忙买个饼。
例如:
DefaultPromise类
awaitUninterruptibly()可手动打断回调,使进程等待。

 public Promise awaitUninterruptibly() {
        if (this.isDone()) {
            return this;
        } else {
            boolean interrupted = false;
            synchronized(this) {
               while(!this.isDone()) {
                    this.checkDeadLock();
                    this.incWaiters();

                    try {
                        this.wait();
                    } catch (InterruptedException var9) {
                        interrupted = true;
                    } finally {
                        this.decWaiters();
                    }
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }

            return this;
        }
    }

进行了死锁判断,避免已存在相同任务;并限制了大等待数量32767

protected void checkDeadLock() {
        EventExecutor e = this.executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(this.toString());
        }
    }

private void incWaiters() {
        if (this.waiters == 32767) {
            throw new IllegalStateException("too many waiters: " + this);
        } else {
            ++this.waiters;
        }
    }
Channel和UnSafe

Channel负责对外提供操作IO的接口,而UnSafe是Channel的内部接口类,如其名一样是不安全的操作,所以封装在接口内部不让外部调用,而实际的操作IO最终都是在Unsafe中执行。

//Channel调用连接为例,跟踪实现连接请求的过程
ChannelFuture connect(SocketAddress var1);

//DefaultChannelPipeline中执行,实际是调用尾部的pipeline
 public ChannelFuture connect(SocketAddress remoteAddress) {
        return this.tail.connect(remoteAddress);
    }

//AbstractChannelHandlerContext是Pipeline容器中的对象,
//持续寻找所有handler执行对象,直到全部被调用
 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        AbstractChannelHandlerContext next = this.findContextOutbound();
        next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
        return promise;
    }
   private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;

        do {
            ctx = ctx.prev;
        } while(!ctx.outbound);

        return ctx;
    }

//而真实的执行是寻找到UnSafe的Invoker
   public ChannelHandlerInvoker invoker() {
        return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
    }

 public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
            if (this.executor.inEventLoop()) {
                ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
            } else {
                this.safeExecuteOutbound(new OneTimeTask() {
                    public void run() {
                        ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
                    }
                }, promise);
            }

        }
    }

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享文章:Netty线程模型、Future、Channel总结和源码分析-创新互联
分享URL:http://cdxtjz.cn/article/joodd.html

其他资讯