目标:介绍基于zookeeper的来实现的远程通信、介绍dubbo-remoting-zookeeper内的源码解析。
对于zookeeper我相信肯定不陌生,在之前的文章里面也有讲到zookeeper来作为注册中心。在这里,基于zookeeper来实现远程通讯,duubo封装了zookeeper client,来和zookeeper server通讯。
下面是类图:
public interface ZookeeperClient { /** * 创建client * @param path * @param ephemeral */ void create(String path, boolean ephemeral); /** * 删除client * @param path */ void delete(String path); /** * 获得子节点集合 * @param path * @return */ List<String> getChildren(String path); /** * 向zookeeper的该节点发起订阅,获得该节点所有 * @param path * @param listener * @return */ List<String> addChildListener(String path, ChildListener listener); /** * 移除该节点的子节点监听器 * @param path * @param listener */ void removeChildListener(String path, ChildListener listener); /** * 新增状态监听器 * @param listener */ void addStateListener(StateListener listener); /** * 移除状态监听 * @param listener */ void removeStateListener(StateListener listener); /** * 判断是否连接 * @return */ boolean isConnected(); /** * 关闭客户端 */ void close(); /** * 获得url * @return */ URL getUrl(); } 复制代码
该接口是基于zookeeper的客户端接口,其中封装了客户端的一些方法。
该类实现了ZookeeperClient接口,是客户端的抽象类,它实现了一些公共逻辑,把具体的doClose、createPersistent等方法抽象出来,留给子类来实现。
/** * url对象 */ private final URL url; /** * 状态监听器集合 */ private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>(); /** * 客户端监听器集合 */ private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); /** * 是否关闭 */ private volatile boolean closed = false; 复制代码
@Override public void create(String path, boolean ephemeral) { // 如果不是临时节点 if (!ephemeral) { // 判断该客户端是否存在 if (checkExists(path)) { return; } } // 获得/的位置 int i = path.lastIndexOf('/'); if (i > 0) { // 创建客户端 create(path.substring(0, i), false); } // 如果是临时节点 if (ephemeral) { // 创建临时节点 createEphemeral(path); } else { // 递归创建节点 createPersistent(path); } } 复制代码
该方法是创建客户端的方法,其中createEphemeral和createPersistent方法都被抽象出来。具体看下面的类的介绍。
@Override public void addStateListener(StateListener listener) { // 状态监听器加入集合 stateListeners.add(listener); } 复制代码
该方法就是增加状态监听器。
@Override public void close() { if (closed) { return; } closed = true; try { // 关闭 doClose(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } 复制代码
该方法是关闭客户端,其中doClose方法也被抽象出。
/** * 关闭客户端 */ protected abstract void doClose(); /** * 递归创建节点 * @param path */ protected abstract void createPersistent(String path); /** * 创建临时节点 * @param path */ protected abstract void createEphemeral(String path); /** * 检测该节点是否存在 * @param path * @return */ protected abstract boolean checkExists(String path); /** * 创建子节点监听器 * @param path * @param listener * @return */ protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener); /** * 为子节点添加监听器 * @param path * @param listener * @return */ protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener); /** * 移除子节点监听器 * @param path * @param listener */ protected abstract void removeTargetChildListener(String path, TargetChildListener listener); 复制代码
上述的方法都是被抽象的,又它的两个子类来实现。
该类继承了AbstractZookeeperClient,是zk客户端的实现类。
/** * zk客户端包装类 */ private final ZkClientWrapper client; /** * 连接状态 */ private volatile KeeperState state = KeeperState.SyncConnected; 复制代码
该类有两个属性,其中client就是核心所在,几乎所有方法都调用了client的方法。
public ZkclientZookeeperClient(URL url) { super(url); // 新建一个zkclient包装类 client = new ZkClientWrapper(url.getBackupAddress(), 30000); // 增加状态监听 client.addListener(new IZkStateListener() { /** * 如果状态改变 * @param state * @throws Exception */ @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; // 如果状态变为了断开连接 if (state == KeeperState.Disconnected) { // 则修改状态 stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { // 状态变为重连 stateChanged(StateListener.RECONNECTED); } }); // 启动客户端 client.start(); } 复制代码
该方法是构造方法,同时在里面也做了创建客户端和启动客户端的操作。其他方法都是实现了父类抽象的方法,并且调用的是client方法,为举个例子:
@Override public void createPersistent(String path) { try { // 递归创建节点 client.createPersistent(path); } catch (ZkNodeExistsException e) { } } 复制代码
该方法是递归场景节点,调用的就是client.createPersistent(path)。
该类是Curator框架提供的一套高级API,简化了ZooKeeper的操作,从而对客户端的实现。
/** * 框架式客户端 */ private final CuratorFramework client; 复制代码
public CuratorZookeeperClient(URL url) { super(url); try { // 工厂创建者 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 创建客户端 client = builder.build(); // 添加监听器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { // 如果为状态为lost,则改变为未连接 if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { // 改变状态为连接 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { // 改变状态为未连接 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 启动客户端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } 复制代码
该方法是构造方法,同样里面也包含了客户端创建和启动的逻辑。
其他的方法也一样是实现了父类的抽象方法,举个列子:
@Override public void createPersistent(String path) { try { client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } 复制代码
@SPI("curator") public interface ZookeeperTransporter { /** * 连接服务器 * @param url * @return */ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); } 复制代码
该方法是zookeeper的信息交换接口。同样也是一个可扩展接口,默认实现CuratorZookeeperTransporter类。
public class ZkclientZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 新建ZkclientZookeeperClient实例 return new ZkclientZookeeperClient(url); } } 复制代码
该类实现了ZookeeperTransporter,其中就是创建了ZkclientZookeeperClient实例。
public class CuratorZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect(URL url) { // 创建CuratorZookeeperClient实例 return new CuratorZookeeperClient(url); } } 复制代码
该接口实现了ZookeeperTransporter,是ZookeeperTransporter默认的实现类,同样也是创建了;对应的CuratorZookeeperClient实例。
该类是zk客户端的包装类。
/** * 超时事件 */ private long timeout; /** * zk客户端 */ private ZkClient client; /** * 客户端状态 */ private volatile KeeperState state; /** * 客户端线程 */ private ListenableFutureTask<ZkClient> listenableFutureTask; /** * 是否开始 */ private volatile boolean started = false; 复制代码
public ZkClientWrapper(final String serverAddr, long timeout) { this.timeout = timeout; listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() { @Override public ZkClient call() throws Exception { // 创建zk客户端 return new ZkClient(serverAddr, Integer.MAX_VALUE); } }); } 复制代码
设置了超时时间和客户端线程。
public void start() { // 如果客户端没有开启 if (!started) { // 创建连接线程 Thread connectThread = new Thread(listenableFutureTask); connectThread.setName("DubboZkclientConnector"); connectThread.setDaemon(true); // 开启线程 connectThread.start(); try { // 获得zk客户端 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (Throwable t) { logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); } started = true; } else { logger.warn("Zkclient has already been started!"); } } 复制代码
该方法是客户端启动方法。
public void addListener(final IZkStateListener listener) { // 增加监听器 listenableFutureTask.addListener(new Runnable() { @Override public void run() { try { client = listenableFutureTask.get(); // 增加监听器 client.subscribeStateChanges(listener); } catch (InterruptedException e) { logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!"); } catch (ExecutionException e) { logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); } } }); } 复制代码
该方法是为客户端添加监听器。
其他方法都是对于 客户端是否还连接的检测,可自行查看代码。
public interface ChildListener { /** * 子节点修改 * @param path * @param children */ void childChanged(String path, List<String> children); } 复制代码
该接口是子节点的监听器,当子节点变化的时候会用到。
public interface StateListener { int DISCONNECTED = 0; int CONNECTED = 1; int RECONNECTED = 2; /** * 状态修改 * @param connected */ void stateChanged(int connected); } 复制代码
该接口是状态监听器,其中定义了一个状态更改的方法以及三种状态。
该部分相关的源码解析地址: github.com/CrazyHZM/in…
该文章讲解了基于zookeeper的来实现的远程通信、介绍dubbo-remoting-zookeeper内的源码解析,关键需要对zookeeper有所了解。该篇之后,远程通讯的源码解析就先到这里了,其实大家会发现,如果能够对讲解api系列的文章了解透了,那么后面的文章九很简单,就好像轨道铺好,可以直接顺着轨道往后,根本没有阻碍。接下来我将开始对rpc模块进行讲解。