RPC是实现SOA的基础,我在项目中多次玩过dubbo等SOA框架了,这种远程方法调用的方式不仅有趣而且有非常重要的现实意义。可以让我们把庞大的系统拆分成许多模块,每个模块又可以根据不同的压力启动不同数量的实例,模块间通过RPC透明地通信,从而将集中式的系统改造成分布式应用提高其扩展性,优化硬件资源利用率。
闲来无事,我们也来一步步手动实现RPC来感受下其中的乐趣吧。
甭管是否是远程方法调用,写两个可以调用的服务先。
很简单,一个是打招呼服务,两个方法,一个say hello,一个say byebye。另一个是计算服务,一个加,一个减。
另外,再整个简单的线程池来提高下资源利用率
1.服务端发布服务
分别把GreetingService注册到3456端口,CalculateService注册到6543端口。
2.客户端订阅服务
3.管理发布和订阅的框架
4.服务发布线程
服务发布的实质就是创建ServerSocket,监听服务发布的端口,当接收到请求时,根据请求中的方法名和参数动态调用方法,并把结果返回给客户端。
5.客户端代理
用动态代理模式,使得客户端调用方法的时候,实际上是建立socket连接,把方法和参数传给服务端,并接收服务端返回的结果。
运行结果
上一步我们已经实现了RPC模型,不过仍有很多不足,比如我们用的是阻塞IO的方式进行的通信,因为远程方法的调用不需要建立长连接,所以用非阻塞IO的方式可以大大提高效率。
1.客户端和服务端的Customer.java和Provider.java不变,因为我们只是改变通讯方式。
2.RpcFramework.java中的publish方法需要稍加改动
在bio的方式中,我们为每个service建立一个ServerSocket,而这里,我们创建一个selector,然后为每个service创建一个channel注册到selector中,每个channel处于OP_ACCETP状态。
3.WorkThread.java修改如下
public class WorkThread implements Runnable { private Selector selector; private Map<Integer, Object> map; private final static int BLOCK = 4096; private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK); private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK); public WorkThread(Selector selector, Map<Integer, Object> map) { this.selector = selector; this.map = map; } @Override public void run() { while(true) { try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey sk = iter.next(); iter.remove(); ServerSocketChannel server = null; SocketChannel client = null; int count = 0; if(sk.isAcceptable()) { server = (ServerSocketChannel)sk.channel(); client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } else if(sk.isReadable()) { client = (SocketChannel)sk.channel(); receiveBuffer.clear(); count = client.read(receiveBuffer); if(count > 0) { String s = new String(receiveBuffer.array(),0,count); ObjectMapper mapper = new ObjectMapper(); JsonNode node = mapper.readTree(s); String methodName = node.get("method").asText(); JsonNode ptNode = node.get("parameterType"); JsonNode pvNode = node.get("args"); Object result = null; if(ptNode.isArray() && pvNode.isArray()) { int length = ptNode.size(); Class[] paramTypes = new Class[length]; for(int i = 0; i < length; i++) { paramTypes[i] = Class.forName(ptNode.get(i).asText()); } Object[] args = new Object[length]; for(int i = 0; i < length; i++) { args[i] = pvNode.get(i).isInt()? Integer.valueOf(pvNode.get(i).asInt()): pvNode.get(i).asText(); } int port = ((InetSocketAddress)client.getLocalAddress()).getPort(); Object service = map.get(port); Method method = service.getClass().getMethod(methodName, paramTypes); result = method.invoke(service, args); } sendBuffer.clear(); sendBuffer.put(result.toString().getBytes()); sendBuffer.flip(); client.write(sendBuffer); client.register(selector, SelectionKey.OP_READ); } client.close(); sk.cancel(); } } } catch (Exception e) { e.printStackTrace(); } } } }
这里为了方便调试,使用了json字符串传递数据。
4.InvocationProxy.java修改如下
package com.rick.archi.soa.nio_tcp_rpc; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ObjectNode; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class InvocationProxy implements InvocationHandler { private String host; private int port; private final static int BLOCK = 4096; private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK); private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK); public InvocationProxy(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); Selector selector = Selector.open(); sc.register(selector, SelectionKey.OP_CONNECT); sc.connect(new InetSocketAddress(host, port)); Iterator<SelectionKey> iterator; SelectionKey selectionKey; SocketChannel client; String result = ""; int count=0; boolean finish = false; while(!finish) { selector.select(); iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { selectionKey = iterator.next(); if (selectionKey.isConnectable()) { client = (SocketChannel) selectionKey.channel(); if (client.isConnectionPending()) { client.finishConnect(); sendBuffer.clear(); ObjectMapper objectMapper = new ObjectMapper(); ObjectNode node = objectMapper.createObjectNode(); Class<?>[] clazz = method.getParameterTypes(); ArrayNode array1 = objectMapper.valueToTree(clazz); ArrayNode array2 = objectMapper.valueToTree(args); node.put("method", method.getName()); node.put("parameterType", array1); node.put("args", array2); String s = node.toString(); sendBuffer.put(s.getBytes()); sendBuffer.flip(); client.write(sendBuffer); } client.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { client = (SocketChannel) selectionKey.channel(); receiveBuffer.clear(); count = client.read(receiveBuffer); if(count>0){ result = new String( receiveBuffer.array(),0,count); } finish = true; client.close(); selectionKey.cancel(); } selector.selectedKeys().clear(); } } return result; } }
运行结果是一样的。
使用nio的方式可以在一个线程中处理多个channel。从而大大提高效率,但是你也发现原生的nio写法非常繁琐,这里我们就用netty(3.x)来改写代码,使代码更加简洁易懂。
1.Provider.java和Customer.java依然不变
2.RpcFramework.java改写如下
public class RpcFramework { private static Map<String, ClientBootstrap> map = Maps.newHashMap(); public static void publish(final Map<Integer, Object> map) throws Exception { ServerBootstrap server = new ServerBootstrap( new NioServerSocketChannelFactory( ThreadPoolHelper.getExecutorInstance(), ThreadPoolHelper.getExecutorInstance() ) ); server.setPipelineFactory( new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ServerHandler(map)); } }); Set<Integer> ports = map.keySet(); for(int port : ports) { server.bind(new InetSocketAddress(port)); } } public static <T> T subscribe(final Class<T> interfaceClass, final String host, final int port) throws Exception { final SimpleChannelHandler handler = new ClientHandler(); final String[] result = new String[1]; ClientBootstrap client; if((client = map.get("client")) == null) { client = new ClientBootstrap( new NioClientSocketChannelFactory( ThreadPoolHelper.getExecutorInstance(), ThreadPoolHelper.getExecutorInstance() ) ); map.put("client", client); } client.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(handler); } }); ChannelFuture future = client.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { synchronized (result) { result.notify(); } } }); synchronized (result) { result.wait(); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, (InvocationHandler)handler); } }
在subscribe方法中我用map保存ClientBootstrap是因为netty中处理的ClientBootstrap必须是实例变量。而这里的ClientBootstrap对象又必须全局唯一。最后为future添加监听函数,使得客户端连接成功以后,subscribe方法结束阻塞返回代理。
3.原来的WorkThread.java变成了ServerHandler.java,代码如下:
public class ServerHandler extends SimpleChannelHandler { private Map<Integer, Object> map; public ServerHandler(Map<Integer, Object> map) { this.map = map; } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("Exception occured..."); e.getCause().printStackTrace(); } @Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); String s = new String(buf.array(), "utf-8"); ObjectMapper mapper = new ObjectMapper(); JsonNode node = mapper.readTree(s); String methodName = node.get("method").asText(); JsonNode ptNode = node.get("parameterType"); JsonNode pvNode = node.get("args"); Object result = null; if(ptNode.isArray() && pvNode.isArray()) { int length = ptNode.size(); Class[] paramTypes = new Class[length]; for(int i = 0; i < length; i++) { paramTypes[i] = Class.forName(ptNode.get(i).asText()); } Object[] args = new Object[length]; for(int i = 0; i < length; i++) { args[i] = pvNode.get(i).isInt()? Integer.valueOf(pvNode.get(i).asInt()): pvNode.get(i).asText(); } int port = ((InetSocketAddress)e.getChannel().getLocalAddress()).getPort(); Object service = map.get(port); Method method = service.getClass().getMethod(methodName, paramTypes); result = method.invoke(service, args); } ChannelFuture future = e.getChannel().write(ChannelBuffers.wrappedBuffer(((String)result).getBytes("utf-8"))); future.sync(); } }
4.原来的InvocationProxy.java变成了ClientHandler.java,代码如下:
public class ClientHandler extends SimpleChannelHandler implements InvocationHandler { private Channel channel; private String[] result = new String[1]; @Override public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable { this.sendMessage(method, args); synchronized (result) { result.wait(); } return result[0]; } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("Client exception..."); e.getCause().printStackTrace(); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { this.channel = e.getChannel(); } public void sendMessage(Method method, Object[] args) throws Exception { if(channel != null && channel.isConnected()) { ObjectMapper objectMapper = new ObjectMapper(); ObjectNode node = objectMapper.createObjectNode(); Class<?>[] clazz = method.getParameterTypes(); ArrayNode array1 = objectMapper.valueToTree(clazz); ArrayNode array2 = objectMapper.valueToTree(args); node.put("method", method.getName()); node.put("parameterType", array1); node.put("args", array2); String s = node.toString(); ChannelFuture future = channel.write(ChannelBuffers.wrappedBuffer(s.getBytes("utf-8"))); future.sync(); } } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); String s = new String(buf.array(), "utf-8"); synchronized (result) { this.result[0] = s; result.notify(); } } }
当调用被代理的方法时,调用sendMessage发送方法名、参数到服务端。messageReceived接收服务端返回的结果。由于是异步执行。所以在invoke方法中执行result.wait()。直到messageReceived接收到结果再将结果返回。
前面我们陆续改进了很多问题,不过还有一个很重要的问题————服务注册、订阅的问题没有解决。实际应用中我们一定会有多台服务器提供同一个服务,我们需要将这些信息注册到注册中心去,同时服务的消费方需要到注册中心拿到所有服务提供方的信息,然后根据某种负载均衡算法,选择其中一个提供方提供服务。因此这里我们加入zookeeper,并使用zkClient做为客户端。
1.Provider.java改动如下:
map.put(3456, service);
map.put(3457, service);
这次我们有两个GreetingService服务提供方,分别使用3456和3457端口。
2.Customer.java改动如下:
GreetingService service = RpcFramework.getInstance().subscribe(GreetingService.class);
CalculateService calService = RpcFramework.getInstance().subscribe(CalculateService.class);
我们无需在调用subscribe订阅时传入服务端口号了,因为我们要根据zookeeper中注册的服务端口号自己选择一个来服务。
3.RpcFramework.java改动如下:
public class RpcFramework { private static final String ROOT = "/service"; private ZkClient client; private RpcFramework() { client = new ZkClient("127.0.0.1"); } private static RpcFramework instance = new RpcFramework(); public static RpcFramework getInstance() { return instance; } private static Map<String, ClientBootstrap> map = Maps.newHashMap(); public void publish(final Map<Integer, Object> map) throws Exception { ServerBootstrap server = new ServerBootstrap( new NioServerSocketChannelFactory( ThreadPoolHelper.getExecutorInstance(), ThreadPoolHelper.getExecutorInstance() ) ); server.setPipelineFactory( new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ServerHandler(map)); } }); Set<Integer> ports = map.keySet(); for(int port : ports) { server.bind(new InetSocketAddress(port)); Class[] interfaces = map.get(port).getClass().getInterfaces(); String name = interfaces[0].getSimpleName(); //register client.createPersistent(ROOT + "/" + name + "/" + port, true); } } public <T> T subscribe(final Class<T> interfaceClass) throws Exception { String host = "127.0.0.1"; int port; String name = interfaceClass.getSimpleName(); List<String> ports = client.getChildren(ROOT + "/" + name); if(ports.size() == 1) { port = Integer.valueOf(ports.get(0)); } else { Random r = new Random(); int index = r.nextInt(ports.size()); port = Integer.valueOf(ports.get(index)); } System.out.println("------------------" + port); final SimpleChannelHandler handler = new ClientHandler(); final String[] result = new String[1]; ClientBootstrap client; if((client = map.get("client")) == null) { client = new ClientBootstrap( new NioClientSocketChannelFactory( ThreadPoolHelper.getExecutorInstance(), ThreadPoolHelper.getExecutorInstance() ) ); map.put("client", client); } client.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(handler); } }); ChannelFuture future = client.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { synchronized (result) { result.notify(); } } }); synchronized (result) { result.wait(); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, (InvocationHandler)handler); } }
我们把它改成了单例类,好让它在初始化的时候就建立zkClient连接。在publish方法最后调用zkClient的createPersistent方法,注册服务。在subscribe方法中用zkClient的getChildren方法得到服务列表,然后随机选择一个提供服务。
当然程序还有很多改进的地方,比如可以加入session或者state改变的监听器等等。不过在这里并不是重点,所以略去不表。不过也不难看出,要想自己开发一个dubbo这样的SOA框架,还是有很多方方面面的细节需要考虑的。