一个服务发现系统提供下面几个机制:
注册它们有用的服务
定位一个单一特殊服务的实例
当一个服务改变时发出通知
一个服务实例使用类ServiceInstance作为服务实例类。ServiceInstance有一个名称、id、地址、端口或者ssl端口以及可选负载(用户定义)。ServiceInstances序列化并存储到ZooKeeper服务器上。
base path |_______ service A name |__________ instance 1 id --> (serialized ServiceInstance) |__________ instance 2 id --> (serialized ServiceInstance) |__________ ... |_______ service B name |__________ instance 1 id --> (serialized ServiceInstance) |__________ instance 2 id --> (serialized ServiceInstance) |__________ ... |_______ ...
主要抽象类是ServiceProvider,它提供了命名的服务、提供策略等服务。服务策略有三种: Round Robin, Random 和 Sticky(总是选择相同一个)。
ServiceProvider开始服务类必须提供start()开始方法和完成时调用close()。唯一在ServiceProvider类的方法有下面定义:
public ServiceInstance<T> getInstance() throws Exception Return an instance for a single use. IMPORTANT: users should not hold on to the instance returned. A fresh instance should always be retrieved. Returns: the instance to use
为了分配一个ServiceProvider,你必须有一个ServiceDiscovery类。服务发现类是由ServiceDiscoveryBuilder构建的。
我们创建一个注册服务的简单程序,改程序主要由三个类组成。
DistributedService: 注册服务类,提供服务的地址,服务内容信息。
DistributedServer: 分布式服务的服务类,创建服务实例,注册服务。
DistributedDiscovery:分布式服务发现类,服务类的初始化,添加、删除、呈现服务。
DistributedService类定义:
import org.codehaus.jackson.map.annotate.JsonRootName; /** * @author: elite_jigang@163.com */ @JsonRootName("services") public class DistributedService { private String address ; private String info; public DistributedService(){ this("", ""); } public DistributedService(String address, String info){ this.address = address; this.info = info; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
DistributedServer类定义:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import java.io.Closeable; import java.io.IOException; /** * @author: elite_jigang@163.com */ public class DistributedServer implements Closeable { private final ServiceDiscovery<DistributedService> serviceDiscovery; private final ServiceInstance<DistributedService> thisInstance; DistributedServer(CuratorFramework client, String path, String serviceName, String address, String info ) throws Exception { // in a real application, you'd have a convention of some kind for the URI layout UriSpec uriSpec = new UriSpec("{scheme}://zookeeper.com:{port}"); thisInstance = ServiceInstance.<DistributedService>builder() .name(serviceName) .payload(new DistributedService(address, info)) .port((int) (65535 * Math.random())) // in a real application, you'd use a common port .uriSpec(uriSpec) .build(); // if you mark your payload class with @JsonRootName the provided JsonInstanceSerializer will work JsonInstanceSerializer<DistributedService> serializer = new JsonInstanceSerializer<DistributedService>(DistributedService.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class) .client(client) .basePath(path) .serializer(serializer) .thisInstance(thisInstance) .build(); } public ServiceInstance<DistributedService> getThisInstance() { return thisInstance; } public void start() throws Exception { serviceDiscovery.start(); } @Override public void close() throws IOException { serviceDiscovery.close(); } }
DistributedDiscover类定义:
import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import util.ZooKeeperConf; import java.util.Collection; import java.util.List; /** * @author elite_jigang@163.com */ public class DistributedDiscovery { private List<DistributedServer> servers = Lists.newArrayList(); private static final String PATH = "/discovery/distributed_services"; private CuratorFramework client = null; private ServiceDiscovery<DistributedService> serviceDiscovery = null; public DistributedDiscovery(){ init(); } public void init(){ client = CuratorFrameworkFactory.newClient(ZooKeeperConf.CLUSTER_NODES, new ExponentialBackoffRetry(1000, 3)); client.start(); JsonInstanceSerializer<DistributedService> serializer = new JsonInstanceSerializer<DistributedService>(DistributedService.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(DistributedService.class) .client(client) .basePath(PATH) .serializer(serializer) .build(); try { serviceDiscovery.start(); } catch (Exception e) { System.out.println("serviceServiceDiscovery.start() with an exception." + e.getMessage()); e.printStackTrace(); } } public void addService(String serviceName, String address, String info) throws Exception { DistributedServer server = new DistributedServer(client, PATH, serviceName,address, info); servers.add(server); server.start(); System.out.println(serviceName + " added"); } public void listServices() throws Exception { Collection<String> serviceNames = serviceDiscovery.queryForNames(); System.out.println(serviceNames.size() + " type(s)"); for (String serviceName : serviceNames) { Collection<ServiceInstance<DistributedService>> instances = serviceDiscovery.queryForInstances(serviceName); System.out.println(serviceName); for(ServiceInstance<DistributedService> instance: instances){ outputInstance(instance); } } } private static void outputInstance(ServiceInstance<DistributedService> instance) { System.out.println("/t address: " + instance.getPayload().getAddress()+ ", info: " + instance.getPayload().getInfo()+ ": " + instance.buildUriSpec()); } public List<DistributedServer> getServers() { return servers; } }
下面是测试类ServicesMain
import java.lang.management.ManagementFactory; import java.util.concurrent.Callable; /** * @author elite_jigang@163.com */ public class ServicesMain { public static void main(String[] args) throws Exception { DistributedDiscovery dd = new DistributedDiscovery(); String name = ManagementFactory.getRuntimeMXBean().getName(); System.out.printf("getRuntimeMXBean mame: %s", name); int index = name.indexOf('@'); // pid as the service name . dd.addService(name.substring(0, index), "192.168.11.2:8089", "cluster node 1"); // dd.addService(name, "192.168.11.3:8089", "cluster node 2"); dd.listServices(); // Callable<Boolean > callable = new Callable<Boolean>(){ // @Override // public Boolean call() throws Exception{ // System.out.println(); // boolean isStop = true; // while(isStop){ // //wait 10 seconds // Thread.sleep(10000); // isStop = false; // } // return true; // } // }; // callable.call(); Thread.currentThread().sleep(10000); dd.listServices(); } }
多次执行上面的代码,你可以看到第一次执行时,会打印出所有的注册服务。一旦有的注册服务消失,发现到的服务也会减少。
getRuntimeMXBean mame: 8040@JOHNLAU--localhost: 172.24.219.99 8040 added --localhost: 172.24.219.99 8040@JOHNLAU added 10 type(s) test-b test-a 2620 address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118 8040@JOHNLAU address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009 7896 10004@JOHNLAU 2620@JOHNLAU address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955 10004 7896@JOHNLAU 8040 address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458 10 type(s) test-b test-a 2620 address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:42118 8040@JOHNLAU address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:17009 7896 10004@JOHNLAU 2620@JOHNLAU address: 192.168.11.3:8089, info: cluster node 2: http://zookeeper.com:58955 10004 7896@JOHNLAU 8040 address: 192.168.11.2:8089, info: cluster node 1: http://zookeeper.com:53458 Disconnected from the target VM, address: '127.0.0.1:58347', transport: 'socket'
作者:GreatElite 发表于2016/2/13 23:50:01 原文链接
阅读:3 评论:0 查看评论