在《 Hadoop2源码分析-RPC机制初识 》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:
下面开始今天的博客介绍。
Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:
下图为Java NIO的工作原理图,如下图所示:
首先,我们来看NIOServer的代码块。代码内容如下所示:
package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 8, 2015 * * @Author dengjie * * @Note Defined nio server */ public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); // The channel manager private Selector selector; /** * Get ServerSocket channel and initialize * * 1.Get a ServerSocket channel * * 2.Set channel for non blocking * * 3.The channel corresponding to the ServerSocket binding to port port * * 4.Get a channel manager * * 5.The channel manager and the channel binding, and the channel registered * SelectionKey.OP_ACCEPT event * * @param port * @throws IOException */ public void init(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); this.selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * listen selector * * @throws IOException */ public void listen() throws IOException { LOGGER.info("Server has start success"); while (true) { selector.select(); Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); ite.remove(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false);// 非阻塞 channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes())); channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限 } else if (key.isReadable()) { read(key); } } } } /** * Deal client send event */ public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); byte[] data = buffer.array(); String info = new String(data).trim(); LOGGER.info("Server receive info : " + info); ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); channel.write(outBuffer);// 将消息回送给客户端 } public static void main(String[] args) { try { NIOServer server = new NIOServer(); server.init(ConfigureAPI.ServerAddress.NIO_PORT); server.listen(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("NIOServer main run error,info is " + ex.getMessage()); } } }
然后,我们在来看NIOClient的代码块,代码具体内容如下所示:
package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 8, 2015 * * @Author dengjie * * @Note Defined NIO client */ public class NIOClient { private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class); private Selector selector; /** * Get ServerSocket channel and initialize */ public void init(String ip, int port) throws Exception { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); this.selector = Selector.open(); channel.connect(new InetSocketAddress(ip, port)); channel.register(selector, SelectionKey.OP_CONNECT); } /** * listen selector */ public void listen() throws Exception { while (true) { selector.select(); Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); ite.remove(); if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.configureBlocking(false);// 非阻塞 channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes())); channel.register(this.selector, SelectionKey.OP_READ); } else if (key.isReadable()) { read(key); } } } } /** * Deal client send event */ public void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); byte[] data = buffer.array(); String info = new String(data).trim(); LOGGER.info("Client receive info : " + info); ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); channel.write(outBuffer); } public static void main(String[] args) { try { NIOClient client = new NIOClient(); client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT); client.listen(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("NIOClient main run has error,info is " + ex.getMessage()); } } }
下面给出ConfigureAPI类的代码,内容如下所示:
package cn.hadoop.conf; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Defined rpc info */ public class ConfigureAPI { public interface VersionID { public static final long RPC_VERSION = 7788L; } public interface ServerAddress { public static final int NIO_PORT = 8888; public static final String NIO_IP = "127.0.0.1"; } }
在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。
而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。
package cn.java.base; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * @Date May 7, 2015 * * @Author dengjie */ public class JProxy { public static void main(String[] args) { JInvocationHandler ji = new JInvocationHandler(); Subject sub = (Subject) ji.bind(new RealSubject()); System.out.println(sub.say("dengjie", 25)); } } interface Subject { public String say(String name, int age); } class RealSubject implements Subject { @Override public String say(String name, int age) { return name + "," + age; } } class JInvocationHandler implements InvocationHandler { private Object object = null; public Object bind(Object object) { this.object = object; return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object tmp = method.invoke(this.object, args); return tmp; } }
package cn.java.base; /** * @Date May 7, 2015 * * @Author dengjie */ public class JReflect { public static void main(String[] args) { Fruit f = Factory.getInstance(Orange.class.getName()); if (f != null) { f.eat(); } } } interface Fruit { public abstract void eat(); } class Apple implements Fruit { @Override public void eat() { System.out.println("apple"); } } class Orange implements Fruit { @Override public void eat() { System.out.println("orange"); } } class Factory { public static Fruit getInstance(String className) { Fruit f = null; try { f = (Fruit) Class.forName(className).newInstance(); } catch (Exception e) { e.printStackTrace(); } return f; } }
本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为 CaculateService ,继承与 VersionedProtocol ,具体代码如下所示:
package cn.hadoop.service; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.ipc.VersionedProtocol; import cn.hadoop.conf.ConfigureAPI; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Data calculate service interface */ @ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION) public interface CaculateService extends VersionedProtocol { // defined add function public IntWritable add(IntWritable arg1, IntWritable arg2); // defined sub function public IntWritable sub(IntWritable arg1, IntWritable arg2); }
注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。
CaculateServiceImpl类实现CaculateService接口。代码如下所示:
package cn.hadoop.service.impl; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolSignature; import cn.hadoop.conf.ConfigureAPI; import cn.hadoop.service.CaculateService; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Implements CaculateService class */ public class CaculateServiceImpl implements CaculateService { public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException { return this.getProtocolSignature(arg0, arg1, arg2); } /** * Check the corresponding version */ public long getProtocolVersion(String arg0, long arg1) throws IOException { return ConfigureAPI.VersionID.RPC_VERSION; } /** * Add nums */ public IntWritable add(IntWritable arg1, IntWritable arg2) { return new IntWritable(arg1.get() + arg2.get()); } /** * Sub nums */ public IntWritable sub(IntWritable arg1, IntWritable arg2) { return new IntWritable(arg1.get() - arg2.get()); } }
CaculateServer服务类,对外提供服务,具体代码如下所示:
package cn.hadoop.rpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import cn.hadoop.service.CaculateService; import cn.hadoop.service.impl.CaculateServiceImpl; /** * @Date May 7, 2015 * * @Author dengjie * * @Note Server Main */ public class CaculateServer { private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class); public static final int IPC_PORT = 9090; public static void main(String[] args) { try { Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class) .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build(); server.start(); LOGGER.info("CaculateServer has started"); System.in.read(); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("CaculateServer server error,message is " + ex.getMessage()); } } }
注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。
RPCClient客户端类,用于访问Server端,具体代码实现如下所示:
package cn.hadoop.rpc; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.service.CaculateService; /** * @Date May 7, 2015 * * @Author dengjie * * @Note RPC Client Main */ public class RPCClient { private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class); public static void main(String[] args) { InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT); try { RPC.getProtocolVersion(CaculateService.class); CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class, RPC.getProtocolVersion(CaculateService.class), addr, new Configuration()); int add = service.add(new IntWritable(2), new IntWritable(3)).get(); int sub = service.sub(new IntWritable(5), new IntWritable(2)).get(); LOGGER.info("2+3=" + add); LOGGER.info("5-2=" + sub); } catch (Exception ex) { ex.printStackTrace(); LOGGER.error("Client has error,info is " + ex.getMessage()); } } }
Hadoop V2 RPC服务端截图预览,如下所示:
Hadoop V2 RPC客户端截图预览,如下所示:
Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《 Hadoop2源码分析-序列化篇 》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!