zookeeper在代码里怎么编写 (zookeeper连接数据库方式)

上一篇:ZooKeeper源码分析(五)Leader选举流程源码流程分析

接上一篇,我们知道类FastLeaderElection是一个Leader选举类,选举肯定要和其他机器通信,而且我们在这个里面看见了一个比较关键的一行代码

zookeeper搭建只有主机能运行,zookeeper为基础的框架

刚刚开始queueSendMap肯定是没有值的,直接返回false,会直接到上面connectAll方法里面去

zookeeper搭建只有主机能运行,zookeeper为基础的框架

这个方法的注释就已经说明了,"尝试与每个服务器建立连接(如果有的话)",我们就看这个方法就好了

zookeeper搭建只有主机能运行,zookeeper为基础的框架

先判断senderWorkerMap有没有sid对应的信息,因为连接建立好之后会在senderWorkerMap将sid开辟的资源保存,没有的话就会利用Socket建立连接,在初始化资源,我们看一下initiateConnection方法

zookeeper搭建只有主机能运行,zookeeper为基础的框架

这里主要是发送自己的sid给其他机器,并且如果建立连接成功为其他机器开辟相应的数据结构来保证后续的收发消息

zookeeper搭建只有主机能运行,zookeeper为基础的框架

其他机器则会通过之前创建QuorumCnxManager时构建出来的Listener去监听其他机器发送过来的消息

zookeeper搭建只有主机能运行,zookeeper为基础的框架

这里主要是通过BIO形式去监听一个端口号

zookeeper搭建只有主机能运行,zookeeper为基础的框架

如果建立成功则初始化对应资源,如果对方的sid小于自己则将Socket连接关闭

public boolean receiveConnection(Socket sock) {
        Long sid = null;
        
        try {
            // Read server id
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            if (sid == QuorumPeer.OBSERVER_ID) {
                /*
                 * Choose identifier at random. We need a value to identify
                 * the connection.
                 */
                
                sid = observerCounter--;
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        } catch (IOException e) {
            closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return false;
        }
        
        //If wins the challenge, then close the new connection.
        /** 这里会做一个判断,如果对方的sid小于自己则将Socket连接关闭
         * 只允许比自己sid大的机器主动连接自己,防止重复建立TCP连接*/
        if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } else {
            /** 如果建立成功则会为相应的机器开辟对应的数据结构进行收发消息 */
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;    
        }
        return false;
    }

zookeeper搭建只有主机能运行,zookeeper为基础的框架