189 8069 5689

zk工厂方法如何实现NIOServerCnxnFactory

这篇文章主要介绍了zk工厂方法如何实现NIOServerCnxnFactory,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

创新互联是一家集网站建设,杞县企业网站建设,杞县品牌网站建设,网站定制,杞县网站建设报价,网络营销,网络优化,杞县网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

NIOServerCnxnFactory类

内部类

AbstractSelectThread

AcceptThread

SelectorThread

zk工厂方法如何实现NIOServerCnxnFactory

属性

ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT

10s session过期时间

ZOOKEEPER_NIO_NUM_SELECTOR_THREADS

selector 线程数

ZOOKEEPER_NIO_NUM_WORKER_THREADS

worker 线程数

directBuffer

buffer用来线程间数据交互

ipMap

限制ip上连接数

cnxnExpiryQueue

连接失效时间分桶队列

workerPool

WorkerService worker执行服务

acceptThread

接收新连接,simple round-robin 分配到选择线程

selectorThreads

 
  

方法

停止接收

private void pauseAccept(long millisecs) {
    acceptKey.interestOps(0);
    try {
        selector.select(millisecs);
    } catch (IOException e) {
        // ignore
    } finally {
        acceptKey.interestOps(SelectionKey.OP_ACCEPT);
    }
}

private boolean doAccept() {
    boolean accepted = false;
    SocketChannel sc = null;
    try {
        sc = acceptSocket.accept();
        accepted = true;
        InetAddress ia = sc.socket().getInetAddress();
        int cnxncount = getClientCnxnCount(ia);

        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
            throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
        }

        LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

        sc.configureBlocking(false);

        // Round-robin assign this connection to a selector thread
        if (!selectorIterator.hasNext()) {
            selectorIterator = selectorThreads.iterator();
        }
        SelectorThread selectorThread = selectorIterator.next();
        if (!selectorThread.addAcceptedConnection(sc)) {
            throw new IOException("Unable to add connection to selector queue"
                                  + (stopped ? " (shutdown in progress)" : ""));
        }
        acceptErrorLogger.flush();
    } catch (IOException e) {
        // accept, maxClientCnxns, configureBlocking
        ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
        acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
        fastCloseSock(sc);
    }
    return accepted;
}




private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            key = accepted.register(selector, SelectionKey.OP_READ);
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        } catch (IOException e) {
            // register, createConnection
            cleanupSelectionKey(key);
            fastCloseSock(accepted);
        }
    }
}



configure


获取客户端连接数
private int getClientCnxnCount(InetAddress cl) {
    Set s = ipMap.get(cl);
    if (s == null) {
        return 0;
    }
    return s.size();
}


创建连接
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {
    return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
}


创建连接
private void addCnxn(NIOServerCnxn cnxn) throws IOException {
    InetAddress addr = cnxn.getSocketAddress();
    if (addr == null) {
        throw new IOException("Socket of " + cnxn + " has been closed");
    }
    Set set = ipMap.get(addr);
    if (set == null) {
        // in general we will see 1 connection from each
        // host, setting the initial cap to 2 allows us
        // to minimize mem usage in the common case
        // of 1 entry --  we need to set the initial cap
        // to 2 to avoid rehash when the first entry is added
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap(2));
        // Put the new set in the map, but only if another thread
        // hasn't beaten us to it
        Set existingSet = ipMap.putIfAbsent(addr, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    set.add(cnxn);

    cnxns.add(cnxn);
    touchCnxn(cnxn);
}

思考:
为什么单机和集群模式启动不一样
单机可以直接从日志,快照恢复数据
集群根据角色划分,涉及到数据同步

感谢你能够认真阅读完这篇文章,希望小编分享的“zk工厂方法如何实现NIOServerCnxnFactory”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


当前名称:zk工厂方法如何实现NIOServerCnxnFactory
标题URL:http://cdxtjz.cn/article/jdosgp.html

其他资讯