ZooKeeper服务发现客户端
单位想把那套ZooKeeper集群用起来.
作为配置中心,一旦出现问题,所有服务都是中断的.
尤其又涉及ACL
想来想去,我觉得这个客户端还是自己封装,安全系数大些.
好长时间也没有写过代码了,确实感觉很生疏.写的逻辑稍微有点乱.
别到时候上线了,因为自己的客户端引发问题.那就尴尬了.
要求:
1.在客户端实现负载均衡
2.客户端ACL密码加密
3.在客户端实现缓存,如果ZK挂了,还能继续提供服务
4.如果服务提供方在ZK正常注册,但是服务调用方出现调用异常,需要将这个服务在缓存做一个标识.在一段时间内,不提供这个服务地址.(默认5分钟)
5.这段代码憋出事儿..心中默念一百遍..
代码结构:
zoo.properties
server=192.168.1.105:2181,192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:2181
invoker.vdfs=/service/vdfs/upload/dx
r=pqrMCxdhQUhKEgMFZoJG3vM2tDdIGqbA/rlZt9RkL8s=
rw=MqnMrPsX3c8RX7b+NES4mQ==
provider./service/vdfs/upload/dx=http://192.168.1.111
nodename=192.168.16.114:8080
Metadata,提供配置文件加解密和配置文件提取
- package com.vv.zkClient;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.security.SecureRandom;
- import java.util.Enumeration;
- import java.util.Map;
- import java.util.Properties;
-
- import javax.crypto.Cipher;
- import javax.crypto.SecretKey;
- import javax.crypto.SecretKeyFactory;
- import javax.crypto.spec.DESKeySpec;
-
- import org.apache.commons.codec.binary.Base64;
- import org.jboss.netty.util.internal.ConcurrentHashMap;
-
- public class Metadata {
- private static volatile Metadata META = null;
- public static String decrypt(byte[] content, String key) {
- try {
- SecureRandom random = new SecureRandom();
- DESKeySpec desKey = new DESKeySpec(key.getBytes());
- SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
- SecretKey securekey = keyFactory.generateSecret(desKey);
- Cipher cipher = Cipher.getInstance("DES");
- cipher.init(Cipher.DECRYPT_MODE, securekey, random);
- byte[] result = cipher.doFinal(content);
- return new String(result);
- } catch (Throwable e) {
- e.printStackTrace();
- }
- return null;
- }
- public static byte[] encrypt(String content, String key) {
- try {
- SecureRandom random = new SecureRandom();
- DESKeySpec desKey = new DESKeySpec(key.getBytes());
- SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
- SecretKey securekey = keyFactory.generateSecret(desKey);
- Cipher cipher = Cipher.getInstance("DES");
- cipher.init(Cipher.ENCRYPT_MODE, securekey, random);
- byte[] result = cipher.doFinal(content.getBytes());
- return result;
- } catch (Throwable e) {
- e.printStackTrace();
- }
- return null;
- }
- public static Metadata getInstance() {
- if (META == null) {
- synchronized (Metadata.class) {
-
- if (META == null) {
- META = new Metadata();
- }
- }
- }
- return META;
- }
-
- private String connectionString = null;
- private Map<String, String> invokerMap = new ConcurrentHashMap<String, String>();
- private String key = "12344321";
-
- private String nodename = null;
-
- private Properties p = new Properties();
-
- private Map<String, String> providerMap = new ConcurrentHashMap<String, String>();
-
- private String readOnlyPassword = null;
-
- private String readwritePassword = null;
-
- public Metadata() {
- InputStream in = Metadata.class.getClassLoader().getResourceAsStream("zoo.properties");
- init(in);
- }
-
- public String getConnectionString() {
- return connectionString;
- }
-
- public Map<String, String> getInvokerMap() {
- return invokerMap;
- }
-
- public String getLocal() {
- return nodename;
- }
-
- public Map<String, String> getProviderMap() {
- return providerMap;
- }
-
- public String getReadOnlyPassword() {
- String password = new String(decrypt(Base64.decodeBase64(this.readOnlyPassword.getBytes()), this.key));
-
- return password;
- }
-
- public String getReadwritePassword() {
- String password = new String(decrypt(Base64.decodeBase64(this.readwritePassword.getBytes()), this.key));
-
- return password;
- }
-
- private void init(InputStream in) {
- try {
- p.load(in);
- connectionString = p.getProperty("server", "");
- readOnlyPassword = p.getProperty("r", "");
- readwritePassword = p.getProperty("rw", "");
- nodename = p.getProperty("nodename", "");
- Enumeration<Object> enums = p.keys();
- while (enums.hasMoreElements()) {
- String key = (String) enums.nextElement();
- if (key.startsWith("invoker.")) {
- invokerMap.put(key.replace("invoker.", ""), p.getProperty(key));
- } else if (key.startsWith("provider.")) {
- providerMap.put(key.replace("provider.", ""), p.getProperty(key));
- }
- }
-
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-
- public static void main(String[] args) {
-
- }
- }
ServiceProvider 服务提供方调用,将自己注册到配置中心
- package com.vv.zkClient;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.RetryUntilElapsed;
- import org.apache.zookeeper.CreateMode;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class ServiceProvider implements Runnable {
- private static Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
-
- private static volatile ServiceProvider SERVICEPROVIDER = null;
- public static ServiceProvider getInstance() {
- if (SERVICEPROVIDER == null) {
- synchronized (ServiceProvider.class) {
-
- if (SERVICEPROVIDER == null) {
- SERVICEPROVIDER = new ServiceProvider();
- }
- }
- }
- return SERVICEPROVIDER;
- }
-
- private CuratorFramework client;
-
- private Metadata meta;
-
- private ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
-
- private ServiceProvider() {
- this.meta = Metadata.getInstance();
- connection();
- threadPool.scheduleAtFixedRate(this, 1, 5, TimeUnit.SECONDS);
- }
-
- private void connection() {
- try {
- this.client = CuratorFrameworkFactory.newClient(meta.getConnectionString(),
- new RetryUntilElapsed(2000, 1000));
- client.start();
- client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", meta.getReadwritePassword().getBytes());
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.error(e.getMessage());
- }
- }
-
- public void run() {
- try {
- for (String serviceNode : meta.getProviderMap().keySet()) {
- String serviceURL = meta.getProviderMap().get(serviceNode);
- String serviceName = serviceNode + "/" + meta.getLocal();
- if (client.checkExists().forPath(serviceName) == null) {
- String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "||" + serviceURL;
- client.create().withMode(CreateMode.EPHEMERAL).forPath(serviceName, date.getBytes());
- LOGGER.info("Created Node->/"{}/",Node Data->/"{}/"", serviceName,date);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- client.close();
- connection();
- LOGGER.error(e.getMessage());
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- ServiceProvider.getInstance();
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
ServiceInvoker 服务调用方使用,获取服务连接地址
- package com.vv.zkClient;
-
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
- import org.apache.curator.retry.RetryUntilElapsed;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- class Service {
- private long lastErrorTime = -1;
- private String name = null;
- private String url = null;
-
- @Override
- public boolean equals(Object obj) {
- Service s = (Service) obj;
- return (this.getName() + this.getUrl()).equals(s.getName() + s.getUrl());
- }
-
- public long getLastErrorTime() {
- return lastErrorTime;
- }
-
- public String getName() {
- return name;
- }
-
- public String getUrl() {
- return url;
- }
-
- public void setLastErrorTime(long lastErrorTime) {
- this.lastErrorTime = lastErrorTime;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setUrl(String url) {
- this.url = url;
- }
-
- }
-
- public class ServiceInvoker {
- private static ServiceInvoker INVOKER = null;
-
- private static Logger LOGGER = LoggerFactory.getLogger(ServiceInvoker.class);
- public static ServiceInvoker getInstance() {
- if (INVOKER == null) {
- synchronized (ServiceInvoker.class) {
-
- if (INVOKER == null) {
- INVOKER = new ServiceInvoker();
- }
- }
- }
- return INVOKER;
- }
-
- private CuratorFramework client;
- private CountDownLatch isInitialized = new CountDownLatch(1);
- private Map<String, Iterator<Service>> itMap = new ConcurrentHashMap<String, Iterator<Service>>();
-
- private Metadata meta;
-
- private Map<String, List<Service>> providerMap = new ConcurrentHashMap<String, List<Service>>();
-
- private ExecutorService threadPool = Executors.newSingleThreadExecutor();
-
- private ServiceInvoker() {
- this.meta = Metadata.getInstance();
- connection();
- listener();
- }
-
- private void connection() {
- try {
- this.client = CuratorFrameworkFactory.newClient(meta.getConnectionString(),
- new RetryUntilElapsed(2000, 1000));
- client.start();
- client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", meta.getReadOnlyPassword().getBytes());
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.error(e.getMessage());
- }
- }
-
- public String get(final String serviceNode) {
-
- Callable<String> c = new Callable<String>() {
-
- public String call() throws Exception {
- List<Service> list = providerMap.get(serviceNode);
- if (list == null) {
- list = new CopyOnWriteArrayList<Service>();
- providerMap.put(serviceNode, list);
- }
-
- Iterator<Service> it = itMap.get(serviceNode);
- if (it == null || !it.hasNext()) {
- it = list.iterator();
- itMap.put(serviceNode, it);
- }
- if (!it.hasNext()) {
- LOGGER.error("节点:/"{}/",没有任何可用服务", serviceNode);
- return "";
- }
- Service service = it.next();
- long now = System.currentTimeMillis();
- int retryCount = 5;
- while (service.getLastErrorTime() != -1 && (now - service.getLastErrorTime()) < 1000 * 60 * 5) {
- retryCount--;
- if (retryCount == 0) {
- LOGGER.error("节点:/"{}/",没有任何可用服务", serviceNode);
- return "";
- }
- if (it.hasNext()) {
- service = it.next();
- } else {
- it = providerMap.get(serviceNode).iterator();
- itMap.put(serviceNode, it);
- }
- }
- return service.getUrl();
- }
-
- };
-
- String serviceUrl = "";
-
- try {
- isInitialized.await();
- serviceUrl = threadPool.submit(c).get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- return serviceUrl;
- }
-
- private void listener() {
- for (String serviceNode : meta.getInvokerMap().values()) {
-
- PathChildrenCache cache = new PathChildrenCache(client, serviceNode, true);
- try {
- cache.start(StartMode.POST_INITIALIZED_EVENT);
- } catch (Exception e) {
- e.printStackTrace();
- }
- cache.getListenable().addListener(new PathChildrenCacheListener() {
-
- public void childEvent(CuratorFramework arg0, final PathChildrenCacheEvent event) throws Exception {
-
- final String type = event.getType().name();
- if (type.equals("INITIALIZED")) {
- LOGGER.info("ZooKeeper数据初始化完成:INITIALIZED");
- isInitialized.countDown();
- return;
- }
- final String data = event.getData().getPath();
- final String serviceNode = data.substring(0, data.lastIndexOf("/"));
-
- final String serviceUrl = new String(event.getData().getData()).split("//|//|")[1];
- Runnable r = new Runnable() {
- public void run() {
- List<Service> list = providerMap.get(serviceNode);
- if (list == null) {
- list = new CopyOnWriteArrayList<Service>();
- providerMap.put(serviceNode, list);
- }
- Service s = new Service();
- s.setName(serviceNode);
- s.setUrl(serviceUrl);
- if (type.equals("CHILD_ADDED")) {
- list.add(s);
- LOGGER.info("新增节点:/"{}/",服务地址:/"{}/"", data, serviceUrl);
- } else if (type.equals("CHILD_REMOVED")) {
- for (int i = 0; i < list.size(); i++) {
- Service service = list.get(i);
- if (service.equals(s)) {
- list.remove(i);
- LOGGER.info("删除节点:/"{}/",服务地址:/"{}/"", data, serviceUrl);
- }
-
- }
- }
- }
- };
- threadPool.submit(r);
- }
- });
-
- }
- }
-
- public void setLastErrorTime(final String url) {
- Runnable r = new Runnable() {
- public void run() {
- for (List<Service> list : providerMap.values()) {
- Iterator<Service> it = list.iterator();
- while (it.hasNext()) {
- Service service = it.next();
- if (service.getUrl().equals(url)) {
- service.setLastErrorTime(System.currentTimeMillis());
- LOGGER.error("节点:/"{}/",调用URL:/"{}/"异常,该节点停止服务5分钟", service.getName(), service.getUrl());
- }
- }
- }
- }
- };
- threadPool.submit(r);
- }
-
- public static void main(String[] args) throws Exception {
- ServiceInvoker s = ServiceInvoker.getInstance();
- while (true) {
- String str = s.get("/service/vdfs/upload/dx");
- s.setLastErrorTime(str);
- Thread.sleep(5000);
- }
- }
- }
MAVEN配置:
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.4.2</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
-
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.4.2</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>20041127.091804</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>1.1.7</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.1.7</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>1.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <version>1.7.7</version>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.properties</include>
- </includes>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
正文到此结束