转载

[原]Curator服务发现

3.3.1 服务发现

一个服务发现系统提供下面几个机制:

 注册它们有用的服务

 定位一个单一特殊服务的实例

 当一个服务改变时发出通知

3.3.1.1 服务实例

一个服务实例使用类ServiceInstance作为服务实例类。ServiceInstance有一个名称、id、地址、端口或者ssl端口以及可选负载(用户定义)。ServiceInstances序列化并存储到ZooKeeper服务器上。

base path        |_______ service A name                     |__________ instance 1 id --> (serialized ServiceInstance)                     |__________ instance 2 id --> (serialized ServiceInstance)                     |__________ ...        |_______ service B name                     |__________ instance 1 id --> (serialized ServiceInstance)                     |__________ instance 2 id --> (serialized ServiceInstance)                     |__________ ...        |_______ ...

3.3.1.2 服务提供者

主要抽象类是ServiceProvider,它提供了命名的服务、提供策略等服务。服务策略有三种: Round Robin, Random 和 Sticky(总是选择相同一个)。

ServiceProvider开始服务类必须提供start()开始方法和完成时调用close()。唯一在ServiceProvider类的方法有下面定义:

public ServiceInstance<T> getInstance()                             throws Exception Return an instance for a single use. IMPORTANT: users should not hold on to the instance returned. A fresh instance should always be retrieved. Returns: the instance to use

3.3.1.3 服务发现

为了分配一个ServiceProvider,你必须有一个ServiceDiscovery类。服务发现类是由ServiceDiscoveryBuilder构建的。

3.3.1.4 实例程序

我们创建一个注册服务的简单程序,改程序主要由三个类组成。

 DistributedService: 注册服务类,提供服务的地址,服务内容信息。

 DistributedServer: 分布式服务的服务类,创建服务实例,注册服务。

 DistributedDiscovery:分布式服务发现类,服务类的初始化,添加、删除、呈现服务。

DistributedService类定义:

import org.codehaus.jackson.map.annotate.JsonRootName;  /**  * @author: elite_jigang@163.com  */ @JsonRootName("services") public class DistributedService {     private String address ;     private String info;      public DistributedService(){         this("", "");     }     public DistributedService(String address, String info){         this.address = address;         this.info = info;     }     public String getAddress() {         return address;     }      public void setAddress(String address) {         this.address = address;     }      public String getInfo() {         return info;     }      public void setInfo(String info) {         this.info = info;     } }

DistributedServer类定义:

import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer;  import java.io.Closeable; import java.io.IOException;  /**  * @author: elite_jigang@163.com  */ public class DistributedServer  implements Closeable {      private final ServiceDiscovery<DistributedService> serviceDiscovery;     private final ServiceInstance<DistributedService> thisInstance;      DistributedServer(CuratorFramework client,                       String path,                       String serviceName,                       String address,                       String info ) throws Exception {         // in a real application, you'd have a convention of some kind for the URI layout         UriSpec uriSpec = new UriSpec("{scheme}://zookeeper.com:{port}");          thisInstance = ServiceInstance.<DistributedService>builder()                 .name(serviceName)                 .payload(new DistributedService(address, info))                 .port((int) (65535 * Math.random())) // in a real application, you'd use a common port                 .uriSpec(uriSpec)                 .build();          // if you mark your payload class with @JsonRootName the provided JsonInstanceSerializer will work         JsonInstanceSerializer<DistributedService> serializer = new                 JsonInstanceSerializer<DistributedService>(DistributedService.class);          serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class)                 .client(client)                 .basePath(path)                 .serializer(serializer)                 .thisInstance(thisInstance)                 .build();     }      public ServiceInstance<DistributedService> getThisInstance() {         return thisInstance;     }      public void start() throws Exception {         serviceDiscovery.start();     }       @Override     public void close() throws IOException {         serviceDiscovery.close();     } }

DistributedDiscover类定义:

import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import util.ZooKeeperConf;  import java.util.Collection; import java.util.List;  /**  * @author elite_jigang@163.com  */ public class DistributedDiscovery {      private List<DistributedServer> servers = Lists.newArrayList();     private static final String PATH = "/discovery/distributed_services";     private CuratorFramework client = null;     private ServiceDiscovery<DistributedService> serviceDiscovery = null;      public DistributedDiscovery(){         init();     }     public void init(){         client = CuratorFrameworkFactory.newClient(ZooKeeperConf.CLUSTER_NODES,                 new ExponentialBackoffRetry(1000, 3));         client.start();          JsonInstanceSerializer<DistributedService> serializer = new                 JsonInstanceSerializer<DistributedService>(DistributedService.class);         serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class)                 .client(client)                 .basePath(PATH)                 .serializer(serializer)                 .build();         try {             serviceDiscovery.start();         } catch (Exception e) {             System.out.println("serviceServiceDiscovery.start() with an exception." +                     e.getMessage());             e.printStackTrace();         }     }      public void addService(String serviceName,                            String address,                            String info) throws Exception {         DistributedServer server = new DistributedServer(client, PATH, serviceName,address,  info);         servers.add(server);         server.start();          System.out.println(serviceName + " added");     }      public void listServices() throws Exception {         Collection<String> serviceNames = serviceDiscovery.queryForNames();         System.out.println(serviceNames.size() + " type(s)");         for (String serviceName : serviceNames) {             Collection<ServiceInstance<DistributedService>> instances = serviceDiscovery.queryForInstances(serviceName);             System.out.println(serviceName);             for(ServiceInstance<DistributedService> instance: instances){                 outputInstance(instance);             }         }     }     private static void outputInstance(ServiceInstance<DistributedService> instance) {         System.out.println("/t address: " + instance.getPayload().getAddress()+ ", info: " +                 instance.getPayload().getInfo()+ ": " + instance.buildUriSpec());     }     public List<DistributedServer> getServers() {         return servers;     } }

下面是测试类ServicesMain

import java.lang.management.ManagementFactory; import java.util.concurrent.Callable;  /**  * @author elite_jigang@163.com  */ public class ServicesMain {      public static void main(String[] args) throws Exception {         DistributedDiscovery dd = new DistributedDiscovery();          String name = ManagementFactory.getRuntimeMXBean().getName();         System.out.printf("getRuntimeMXBean mame: %s", name);         int index = name.indexOf('@');         // pid as the service name .         dd.addService(name.substring(0, index), "192.168.11.2:8089", "cluster node 1");           //         dd.addService(name, "192.168.11.3:8089", "cluster node 2");         dd.listServices(); //        Callable<Boolean > callable = new Callable<Boolean>(){ //            @Override //            public Boolean call() throws Exception{ //                System.out.println(); //                boolean isStop = true; //                while(isStop){ //                    //wait 10 seconds //                    Thread.sleep(10000); //                    isStop = false; //                } //                return true; //            } //        }; //        callable.call();         Thread.currentThread().sleep(10000);         dd.listServices();     } }
多次执行上面的代码,你可以看到第一次执行时,会打印出所有的注册服务。一旦有的注册服务消失,发现到的服务也会减少。 
getRuntimeMXBean mame: 8040@JOHNLAU--localhost: 172.24.219.99 8040 added --localhost: 172.24.219.99 8040@JOHNLAU added 10 type(s) test-b test-a 2620      address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118 8040@JOHNLAU      address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009 7896 10004@JOHNLAU 2620@JOHNLAU      address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955 10004 7896@JOHNLAU 8040      address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458 10 type(s) test-b test-a 2620      address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118 8040@JOHNLAU      address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009 7896 10004@JOHNLAU 2620@JOHNLAU      address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955 10004 7896@JOHNLAU 8040      address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458 Disconnected from the target VM, address: '127.0.0.1:58347', transport: 'socket'

作者:GreatElite 发表于2016/2/13 23:50:01 原文链接

阅读:3 评论:0 查看评论

原文  http://itindex.net/detail/55184-curator-服务-发现
正文到此结束
Loading...