zookeeper源码分析-线程模型与数据流转分析
技术分析开始的第一篇文章还是要简单的说一下它的基本用法和基本概念,然后再说一下这系列文章的大体分析流程。zk 这个系列的文章也不会很多,应该就是三篇基本结束,如果在算上后面的集群的话也就是四篇,不过集群的东西我会在写完并发编程之后再写,暂时zk 主要的源码分析其实就是三部分,主流程启动、三个线程启动、任务链执行。
本次主要是聊一下zk 的基本概念、线程模型还有主流程启动。zk 的基本概念这种东西在度娘上很轻易就能搜到的东西,所以这篇文章的重点就是放在zookeeper 主流程启动时做了些什么,还有它的线程模型是怎样的,和netty 之前有什么区别。那么现在就开始发车了。
zookeeper的基本概念和安装
说到zookeeper 基本都是想到和dubbo 结合使用,这个是经典的rpc 架构模式,zookeeper 在这个模式里面主要就是被使用为注册中心,就算是目前最流行的spring cloud 组件中的注册中心也有使用zookeeper 的情况。
zk 除了被当做注册中心使用之外,还能做配置中心,分布式锁等等。它本身就是一个很强大的中间件,它的底层代码也是用java 来编写的,它 对Java 也提供了三种客户端:zookeeper 原生的API,Curator,zkClient。zookeeper 原生的API 偏底层不是很好用,一般是用的就是Curator,而Curator 也就是封装了这些API。
说会zk 本身的一些基本概念,我们聊的话,就是从它的来源、数据结构、节点类型三个方面简单的分析一下。
zookeeper的来源
zookeeper 是Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。
zookeeper 的架构通过冗余服务实现高可用行(CP)。
zookeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封住起来,构成一个高效可靠的原语集,并以一系列简单 易用的接口提供给用户使用。
它是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式 协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
数据结构
zookeeper本身是一个树形目录服务(名称空间),非常类似于标准文件系统,key-value 的形式存储。名称 key 由斜线 / 分 割的一系列路径元素,zookeeper 名称空间中的每个节点都是由一个路径来标识的。
它的每一个节点可以存储数据,每一个节点还有对应的状态信息。zookeeper 的key 可以理解为:节点的完整路径;value 就是:节点中的数据。
节点类型
安装于源码下载
zk 的安装还是比较简单的,从官网下载压缩包,然后解压,再然后就是找到conf 目录下的zoo_sample.cfg 文件,复制粘贴在同级目录下,修改名称为zoo.cfg,同时修改文件中的dataDir 属性,该属性值为zk 存储数据的目录,最后bin 目录下的zkServer,如果是Windows 系统的话就是”.cmd”尾缀的文件启动,如果是Linux 系统的话就是”.sh”尾缀的文件启动。
至于zk 的源码可以在我的CSDN 资源上面下载,地址:zk源码注释版 。
线程模型(重点)–主从reactor 多线程模型
zk 的线程模型是重点中的重点,不理解线程模型就没有办法跟代码,线程模型也是面试问到zk 后必须要回答的点,不说废话,先上图。
从上图基本可以看到其实和netty 的线程模型基本类似,主从reactor 多线程模型,不同的是zk 不光是主从reactor 多线程模型,它对于部分的内容更加细化,因为它除了网络传输还有很大一部分的业务逻辑处理,下面我先简单的描述一下这幅图的大体流程,后面我们跟过源码之后,再总结全流程。
- 当client 请求过来的时候,先会在acceptThread 线程上通过第一个selector 注册serverSocketChannel,并将信息添加到队列acceptedQueue 中;
- 然后会在SelectorThread 线程上读取acceptedQueue 中的信息,然后将socketChannel 注册在第二个selector 上监听读写;
- 当读行为被监听到时,会将信息封装为IOWorkRequest 对象,然后封装为线程对象交给workService 线程池来执行。
这就线程模型的一个基本流程,这个还有一些关于对象的封装、线程池的管理、过期检测等等,后面我们看源码的时候会逐一分析到。
zookeeper的主流程启动
zk 的启动是通过bin 目录下的zkServer.cmd 文件或者zkServer.sh 文件,这两者的区别就是对应着两个不同的操作系统,前者是Windows,后者是Linux,它们的本质的区别就是各种的启动命令不一样,但是调用的程序入口都是QuorumPeerMain 对象。
找到程序入口后,可以直接跟进到QuorumPeerMain 对象的main 方法,再进去其调用的initializeAndRun 方法,这里可以看到zk 的两种不同启动方式,单机启动和集群启动,不过在进入两种启动之前,zk 还需要做清理快照和事务日志。
1 | /** |
至于是单机启动和集群启动,我们一点一点分析。
1 | /** |
单机启动
单机启动跟进的就是ZooKeeperServerMain 类的main 方法,然后是initializeAndRun 方法,其余的都重要,最关键的是其中调用的runFromConfig 方法。
runFromConfig 方法就是:根据配置启动运行的具体内容,这里看代码之前,其实我们就可以根据上面的主从reactor 多线程模型中看到,他最重要的内容就是计算出上面三个线程的多少,配置线程池,并启动其线程,至于zk 本身的数据库快照,数据存储本身我们不关注,zk 源码我们主要了解的它的思维、架构模式。
我们这里直接就看对应的源码,如果对于快照初始之类的代码有兴趣可以直接去看我放在CSDN 上的源码(zk源码)。这里我们直接从runFromConfig 方法的CountDownLatch 对象创建开始看。
CountDownLatch–同步工具类,用于阻塞主线程
CountDownLatch 对象是一个同步工具类,这里zk 用于阻塞主线程,因为zk 的具体工作基本都是异步线程做的,主线程只用于创建和启动异步线程。这里可以看到主要就是创建了这个对应,并注册了一个监听对象ZooKeeperServerShutdownHandler,这个对象的作用就是当zk 出现状态异常的时候,将CountDownLatch 进行停止,调用的是其countDown 方法。
1 | /** |
ServerCnxnFactory–主服务的创建和启动
中间的代码可以略过,我们直接看ServerCnxnFactory 的创建。下面的createFactory 方法代码可以看到,如果我们没有默认配置的话,zk 会默认选择NIO 的服务端NIOServerCnxnFactory 对象,当然我们也可以通过配置来选择Netty 的服务端。
1 | cnxnFactory = ServerCnxnFactory.createFactory(); |
1 | public static ServerCnxnFactory createFactory() throws IOException { |
当启动完成之后,我们直接用它的configure 方法,进行参数、线程信息的配置。
结合上面的模型图来看,zk 首先创建的是cnxnExpiryQueue 队列和expirerThread 线程,cnxnExpiryQueue 队列后续存放的就是socketChannel 的一些信息,而expirerThread 线程就是来检测这个会话是否过期了。
1 | /** |
然后这里会根据计算机的CPU核数,来计算出对应的线程数量,这里只要知道就行。
逻辑:当前系统核数除以2后再开根号的整数,最少1个,也就是说我的电脑如果是8核,那么就是8 / 2 = 4 在开根号 = 2个,如果是4核,那么就是1。然后再用这个核数计算出来的数据 * 2 就能得到后面selectorThread 和workService 的线程数量。
1 | /** |
接下来就可以直接看到selectorThread 线程的创建,注意:这里只是创建没有启动,这里创建的数量就是上面计算的大小。
1 | private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); |
1 | for (int i = 0; i < numSelectorThreads; ++i) { |
这个方法的最后就是serverSocketChannel 的监听线程的创建,还有serverSocketChann 的端口绑定。
1 | ServerSocketChannel ss; |
1 | listenBacklog = backlog; |
当ServerCnxnFactory 的configure 方法结束之后,就是startup 启动方法的调用了。但是这个方法,我们只要关注里面的start 方法调用就行了,其余的数据库会话回复、还有各个组件调用之类的,有兴趣的自己看下就行。
start 方法中首先创建的就是上面configure 方法中没有创建的workerPool 线程池,具体的线程数量也是和selectorThread 的线程数量一样的。
1 | if (workerPool == null) { |
然后就是对所有的selectorThread 线程启动了。
1 | /** |
最后就是启动acceptThread 线程用于连接的监听,还有expirerThread 线程用于socketChannel 的过期检测。
1 | //启动acceptThread开始接收连接 |
到这里ServerCnxnFactory 服务的创建和启动就结束了,后续的代码其实我们的关注点就是一个,就是上面说的CountDownLatch 同步工具类的阻塞内容。这里就是调用了提供出来的await 方法进行了阻塞,这样才能保证zk 是启动着,其余的异步线程才能一直运行,只有最后将zk 停止之后,才会运行下面对应的停止逻辑。
1 | shutdownLatch.await();//阻塞等待zookeeper服务关闭或出现内部错误 |
小结
zk 的主流程的启动说白了就是对于主从reactor 多线程模型 中的几个线程的创建、配置、启动,还有就是传输服务端的选择是NIO 还是Netty,至于它本身的数据配置还有日志服务还有各个组件反倒是其次。