Zookeeper 是一个开源的分布式协调服务,目前由 Apache 进行维护。Zookeeper 可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。它具有以下特性:
version: '3'
services:
zookeeper:
image: zookeeper:3.7.0
container_name: zookeeper
restart: unless-stopped
volumes:
- "./zookeeper/data:/data"
- "./zookeeper/datalog:/datalog"
ports:
- "2181:2181"
# webui
zookeeper-webui:
image: tobilg/zookeeper-webui
container_name: zookeeper-webui
restart: unless-stopped
environment:
ZK_DEFAULT_NODE: zookeeper:2181
depends_on:
- zookeeper
links:
- zookeeper
ports:
- "8089:8080"
run
docker-compose -f docker-compose-zookeeper.yml -p zookeeper up -d
可视化界面访问地址:[http://ip地址:8089] ,输入 [{宿主主机ip}:2181/]进入
桌面可视化工具PrettyZoo: https://github.com/vran-dev/PrettyZoo
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>zookeeper</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Apache Zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
package com.et.zookeeper.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;
@Configuration
@Slf4j
public class ZookeeperConfig {
@Value("${zookeeper.address}")
private String connectString;
@Value("${zookeeper.timeout}")
private int timeout;
@Bean(name = "zkClient")
public ZooKeeper zkClient() {
ZooKeeper zooKeeper = null;
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
//After the connection is successful, the watcher will be called back to monitor. This connection operation is asynchronous. After the new statement is executed, the subsequent code will be called directly.
// Multiple service addresses can be specified: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
//If the response event from the server is received, the connection is successful
countDownLatch.countDown();
}
}
});
countDownLatch.await();
log.info("【init zooKeeper connect....】={}", zooKeeper.getState());
} catch (Exception e) {
log.error("init ZooKeeper connect error....】={}", e);
}
return zooKeeper;
}
}
package com.et.zookeeper.api;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Component
@Slf4j
public class ZkApi {
@Autowired
private ZooKeeper zkClient;
/**
* check node is exist
*
* @param path
* @param needWatch
* @return
*/
public Stat exists(String path, boolean needWatch) {
try {
return zkClient.exists(path, needWatch);
} catch (Exception e) {
log.error("【 node exception】{},{}", path, e);
return null;
}
}
/**
* check node is exist and set watcher
* @param path
* @param watcher
* @return
*/
public Stat exists(String path, Watcher watcher) {
try {
return zkClient.exists(path, watcher);
} catch (Exception e) {
log.error("【node exception】{},{}", path, e);
return null;
}
}
/**
* create persist node
*
* @param path
* @param data
*/
public boolean createNode(String path, String data) {
try {
zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (Exception e) {
log.error("【create persist node exception】{},{},{}", path, data, e);
return false;
}
}
/**
* update persist node
*
* @param path
* @param data
*/
public boolean updateNode(String path, String data) {
try {
//The data version of zk starts counting from 0. If the client passes -1, it means that the zk server needs to be updated based on the latest data. If there is no atomicity requirement for the update operation of zk's data node, you can use -1.
//The version parameter specifies the version of the data to be updated. If the version is different from the real version, the update operation will fail. Specify version as -1 to ignore the version check.
zkClient.setData(path, data.getBytes(), -1);
return true;
} catch (Exception e) {
log.error("【update persist node exception】{},{},{}", path, data, e);
return false;
}
}
/**
* delete persist node
*
* @param path
*/
public boolean deleteNode(String path) {
try {
//The version parameter specifies the version of the data to be updated. If the version is different from the real version, the update operation will fail. Specify version as -1 to ignore the version check.
zkClient.delete(path, -1);
return true;
} catch (Exception e) {
log.error("【delete persist node exception】{},{}", path, e);
return false;
}
}
/**
* Get the child nodes of the current node (excluding grandchild nodes)
*
* @param path
*/
public List<String> getChildren(String path) throws KeeperException, InterruptedException {
List<String> list = zkClient.getChildren(path, false);
return list;
}
/**
* Get the value of the specified node
*
* @param path
* @return
*/
public String getData(String path, Watcher watcher) {
try {
Stat stat = new Stat();
byte[] bytes = zkClient.getData(path, watcher, stat);
return new String(bytes);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
package com.et.zookeeper.api;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@Slf4j
public class WatcherApi implements Watcher {
@Override
public void process(WatchedEvent event) {
log.info("【Watcher event】={}", event.getState());
log.info("【Watcher path】={}", event.getPath());
log.info("【Watcher type】={}", event.getType()); // three type: create,delete,update
}
}
server:
port: 8088
zookeeper:
address: 127.0.0.1:2181
timeout: 4000
package com.et.zookeeper.controller;
import com.et.zookeeper.api.ZkApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class HelloWorldController {
@Autowired
private ZkApi zkApi;
@GetMapping(value = "createNode")
public boolean createNode(String path, String data) {
log.debug("ZookeeperController create node {},{}", path, data);
return zkApi.createNode(path, data);
}
}