本文主要介绍使用Zookeeper提供的原生API来操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; /** * Zookeeper基本操作列子 */ public class ZookeeperCase { //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 2 * 1000; private ZooKeeper zooKeeper; @Before public void before() { try { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); System.out.println(zooKeeper.getState()); } catch (IOException e) { e.printStackTrace(); } } @After public void after() throws Exception { zooKeeper.close(); } /** * 创建节点 */ @Test public void create() throws Exception { /* * 同步创建持久节点,ACL为world:anyone:cdrwa * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa */ zooKeeper.create("/javatest/node1", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); /* * 同步创建持久节点,ACL为world:anyone:cr * 等同于该命令:create /javatest/node2 test world:anyone:cr */ zooKeeper.create("/javatest/node2", "test".getBytes(), Collections.singletonList(new ACL((ZooDefs.Perms.CREATE + ZooDefs.Perms.READ), ZooDefs.Ids.ANYONE_ID_UNSAFE)), CreateMode.PERSISTENT); /* * 异步创建临时顺序节点,ACL为ip:127.0.0.1:c * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c */ CountDownLatch counter = new CountDownLatch(1); zooKeeper.create("/javatest/node3", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))), CreateMode.EPHEMERAL_SEQUENTIAL ,new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); /* * 同步创建持久节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa * 等同于该命令:create /javatest/node4 test digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4 */ zooKeeper.create("/javatest/node4", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg="))) , CreateMode.PERSISTENT); /* * 同步创建顺序持久节点,ACL为world:anyone:cdrwa,存活时间为5秒 * 等同于该命令:create -s -t 5000 /javatest/node5 test */ Stat stat = new Stat(); zooKeeper.create("/javatest/node5", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, stat, 5000); System.out.println(stat); } /** * 获取节点数据 * @throws Exception */ @Test public void getData() throws Exception { //同步读取数据 Stat stat = new Stat(); byte[] data = zooKeeper.getData("/javatest/node1", false, stat); System.out.println(new String(data)); System.out.println(stat); //异步读取数据 zooKeeper.addAuthInfo("digest", "jack:123456".getBytes()); CountDownLatch counter = new CountDownLatch(1); zooKeeper.getData("/javatest/node4", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { String s = ""; if (data != null) { s = new String(data); } System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @Test public void setData() throws Exception { //同步设置数据,version为-1表示匹配任何版本 Stat stat = zooKeeper.setData("/javatest/node1", "test2".getBytes(), -1); System.out.println(stat); //异步设置数据 zooKeeper.addAuthInfo("digest", "jack:123456".getBytes()); CountDownLatch counter = new CountDownLatch(1); zooKeeper.setData("/javatest/node4", "test2".getBytes(), -1, new AsyncCallback.StatCallback(){ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @Test public void delete() throws Exception { //同步删除数据 zooKeeper.delete("/javatest/node1", -1); //异步删除数据 CountDownLatch counter = new CountDownLatch(1); zooKeeper.delete("/javatest/node2", -1, new AsyncCallback.VoidCallback(){ @Override public void processResult(int rc, String path, Object ctx) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx); counter.countDown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } }
DataMonitor类实现对节点的监控,节点有变化时会回调DataMonitorListener.process方法,该方法由调用方根据业务来实现;WatcherCase类传入需要的参数来启动DataMonitor。
该例子是根据 官网例子 改造而来,相较官网更简单了些。
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Runnable { private ZooKeeper zk; private DataMonitorListener listener; /** * 节点变化时会回调该方法,把监控变化类型及新数据带过来 */ public interface DataMonitorListener { void process(WatchedEvent event, byte[] data); } public DataMonitor(String hostPort, String znode, DataMonitorListener listener) throws Exception { this.listener = listener; AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat); switch (rc) { case KeeperException.Code.Ok: case KeeperException.Code.NoNode: return; case KeeperException.Code.SessionExpired: case KeeperException.Code.NoAuth: close(); return; default: zk.exists(znode, true, this, null); return; } } }; //监视器 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event); if (event.getType() == Event.EventType.None) { switch (event.getState()) { case SyncConnected: break; case Expired: close(); break; } } else { try { byte[] bytes = zk.getData(event.getPath(), false, null); listener.process(event, bytes); } catch (Exception e) { e.printStackTrace(); } if (event.getPath() != null && event.getPath().equals(znode)) { //再次监控 zk.exists(znode, true, callback, null); } } } }; zk = new ZooKeeper("10.49.196.10:2181", 20000, watcher); zk.exists(znode, true, callback, null); } @Override public void run() { try { synchronized (this) { wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void close() { synchronized (this) { notifyAll(); } } }
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; /** * 监视节点样例 */ public class WatcherCase { public static void main(String[] args) throws Exception { DataMonitor.DataMonitorListener listener = new DataMonitor.DataMonitorListener() { @Override public void process(WatchedEvent event, byte[] data) { //todo:根据实际情况处理 if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { System.out.println(new String(data)); } } }; new DataMonitor("10.49.196.10:2181", "/watchtest", listener).run(); } }