转载

Hadoop2源码分析-RPC探索实战

1.概述

在《 Hadoop2源码分析-RPC机制初识 》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:

  • Java NIO简述
  • Java NIO实例演示
  • 动态代理与反射简述
  • 动态代理与反射实例演示
  • Hadoop V2 RPC框架使用实例

下面开始今天的博客介绍。

2.Java NIO简述

Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:

  • Channels:连接通道,即能从通道读取数据,又能写数据到通道。可以异步读写,读写从Buffer开始。
  • Buffers:消息缓冲区,用于和NIO通道进行交互。所谓缓冲区,它是一块可以读写的内存,该内存被封装成NIO的Buffer对象,并提供相应的方法,以便于访问。
  • Selectors:通道管理器,它能检测到Java NIO中多个通道,单独的线程可以管理多个通道,间接的管理多个网络连接。

下图为Java NIO的工作原理图,如下图所示:

Hadoop2源码分析-RPC探索实战

3.Java NIO实例演示

  • NIOServer

首先,我们来看NIOServer的代码块。代码内容如下所示:

package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /**  * @Date May 8, 2015  *  * @Author dengjie  *  * @Note Defined nio server  */ public class NIOServer {  private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);  // The channel manager  private Selector selector;  /**   * Get ServerSocket channel and initialize   *    * 1.Get a ServerSocket channel   *    * 2.Set channel for non blocking   *    * 3.The channel corresponding to the ServerSocket binding to port port   *    * 4.Get a channel manager   *    * 5.The channel manager and the channel binding, and the channel registered   * SelectionKey.OP_ACCEPT event   *    * @param port   * @throws IOException   */  public void init(int port) throws IOException {   ServerSocketChannel serverChannel = ServerSocketChannel.open();   serverChannel.configureBlocking(false);   serverChannel.socket().bind(new InetSocketAddress(port));   this.selector = Selector.open();   serverChannel.register(selector, SelectionKey.OP_ACCEPT);  }  /**   * listen selector   *    * @throws IOException   */  public void listen() throws IOException {   LOGGER.info("Server has start success");   while (true) {    selector.select();    Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();    while (ite.hasNext()) {     SelectionKey key = (SelectionKey) ite.next();     ite.remove();     if (key.isAcceptable()) {      ServerSocketChannel server = (ServerSocketChannel) key.channel();      SocketChannel channel = server.accept();      channel.configureBlocking(false);// 非阻塞      channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes()));      channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限     } else if (key.isReadable()) {      read(key);     }    }   }  }  /**   * Deal client send event   */  public void read(SelectionKey key) throws IOException {   SocketChannel channel = (SocketChannel) key.channel();   ByteBuffer buffer = ByteBuffer.allocate(1024);   channel.read(buffer);   byte[] data = buffer.array();   String info = new String(data).trim();   LOGGER.info("Server receive info : " + info);   ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());   channel.write(outBuffer);// 将消息回送给客户端  }  public static void main(String[] args) {   try {    NIOServer server = new NIOServer();    server.init(ConfigureAPI.ServerAddress.NIO_PORT);    server.listen();   } catch (Exception ex) {    ex.printStackTrace();    LOGGER.error("NIOServer main run error,info is " + ex.getMessage());   }  } } 
  • NIOClient

然后,我们在来看NIOClient的代码块,代码具体内容如下所示:

package cn.hadoop.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /**  * @Date May 8, 2015  *  * @Author dengjie  *  * @Note Defined NIO client  */ public class NIOClient {  private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class);  private Selector selector;  /**   * Get ServerSocket channel and initialize   */  public void init(String ip, int port) throws Exception {   SocketChannel channel = SocketChannel.open();   channel.configureBlocking(false);   this.selector = Selector.open();   channel.connect(new InetSocketAddress(ip, port));   channel.register(selector, SelectionKey.OP_CONNECT);  }  /**   * listen selector   */  public void listen() throws Exception {   while (true) {    selector.select();    Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();    while (ite.hasNext()) {     SelectionKey key = (SelectionKey) ite.next();     ite.remove();     if (key.isConnectable()) {      SocketChannel channel = (SocketChannel) key.channel();      if (channel.isConnectionPending()) {       channel.finishConnect();      }      channel.configureBlocking(false);// 非阻塞       channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes()));      channel.register(this.selector, SelectionKey.OP_READ);     } else if (key.isReadable()) {      read(key);     }    }   }  }  /**   * Deal client send event   */  public void read(SelectionKey key) throws IOException {   SocketChannel channel = (SocketChannel) key.channel();   ByteBuffer buffer = ByteBuffer.allocate(1024);   channel.read(buffer);   byte[] data = buffer.array();   String info = new String(data).trim();   LOGGER.info("Client receive info : " + info);   ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());   channel.write(outBuffer);  }  public static void main(String[] args) {   try {    NIOClient client = new NIOClient();    client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT);    client.listen();   } catch (Exception ex) {    ex.printStackTrace();    LOGGER.error("NIOClient main run has error,info is " + ex.getMessage());   }  } } 
  • ConfigureAPI

下面给出ConfigureAPI类的代码,内容如下所示:

package cn.hadoop.conf;  /**  * @Date May 7, 2015  *  * @Author dengjie  *  * @Note Defined rpc info  */ public class ConfigureAPI {      public interface VersionID {         public static final long RPC_VERSION = 7788L;     }      public interface ServerAddress {         public static final int NIO_PORT = 8888;         public static final String NIO_IP = "127.0.0.1";     }  }

4.动态代理和反射简述

在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。

而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。

5.动态代理和反射实例演示

5.1动态代理

  • JProxy
package cn.java.base; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /**  * @Date May 7, 2015  *   * @Author dengjie  */ public class JProxy {  public static void main(String[] args) {   JInvocationHandler ji = new JInvocationHandler();   Subject sub = (Subject) ji.bind(new RealSubject());   System.out.println(sub.say("dengjie", 25));  } } interface Subject {  public String say(String name, int age); } class RealSubject implements Subject {  @Override  public String say(String name, int age) {   return name + "," + age;  } } class JInvocationHandler implements InvocationHandler {  private Object object = null;  public Object bind(Object object) {   this.object = object;   return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this);  }  @Override  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {   Object tmp = method.invoke(this.object, args);   return tmp;  } } 

5.2反射

  • JReflect
package cn.java.base; /**  * @Date May 7, 2015  *   * @Author dengjie  */ public class JReflect {  public static void main(String[] args) {   Fruit f = Factory.getInstance(Orange.class.getName());   if (f != null) {    f.eat();   }  } } interface Fruit {  public abstract void eat(); } class Apple implements Fruit {  @Override  public void eat() {   System.out.println("apple");  } } class Orange implements Fruit {  @Override  public void eat() {   System.out.println("orange");  } } class Factory {  public static Fruit getInstance(String className) {   Fruit f = null;   try {    f = (Fruit) Class.forName(className).newInstance();   } catch (Exception e) {    e.printStackTrace();   }   return f;  } } 

6.Hadoop V2 RPC框架使用实例

本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为 CaculateService ,继承与 VersionedProtocol ,具体代码如下所示:

  • CaculateService
package cn.hadoop.service;  import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.ipc.VersionedProtocol;  import cn.hadoop.conf.ConfigureAPI;  /**  * @Date May 7, 2015  *  * @Author dengjie  *  * @Note Data calculate service interface  */ @ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION) public interface CaculateService extends VersionedProtocol {      // defined add function     public IntWritable add(IntWritable arg1, IntWritable arg2);      // defined sub function     public IntWritable sub(IntWritable arg1, IntWritable arg2);  }

注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。

CaculateServiceImpl类实现CaculateService接口。代码如下所示:

  • CaculateServiceImpl
package cn.hadoop.service.impl;  import java.io.IOException;  import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.ProtocolSignature;  import cn.hadoop.conf.ConfigureAPI; import cn.hadoop.service.CaculateService;  /**  * @Date May 7, 2015  *  * @Author dengjie  *  * @Note Implements CaculateService class  */ public class CaculateServiceImpl implements CaculateService {      public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {         return this.getProtocolSignature(arg0, arg1, arg2);     }      /**      * Check the corresponding version      */     public long getProtocolVersion(String arg0, long arg1) throws IOException {         return ConfigureAPI.VersionID.RPC_VERSION;     }      /**      * Add nums      */     public IntWritable add(IntWritable arg1, IntWritable arg2) {         return new IntWritable(arg1.get() + arg2.get());     }      /**      * Sub nums      */     public IntWritable sub(IntWritable arg1, IntWritable arg2) {         return new IntWritable(arg1.get() - arg2.get());     }  }

CaculateServer服务类,对外提供服务,具体代码如下所示:

  • CaculateServer
package cn.hadoop.rpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import cn.hadoop.service.CaculateService; import cn.hadoop.service.impl.CaculateServiceImpl; /**  * @Date May 7, 2015  *  * @Author dengjie  *  * @Note Server Main  */ public class CaculateServer {  private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class);  public static final int IPC_PORT = 9090;  public static void main(String[] args) {   try {    Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class)      .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build();    server.start();    LOGGER.info("CaculateServer has started");    System.in.read();   } catch (Exception ex) {    ex.printStackTrace();    LOGGER.error("CaculateServer server error,message is " + ex.getMessage());   }  } } 

注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。

RPCClient客户端类,用于访问Server端,具体代码实现如下所示:

  • RPCClient
package cn.hadoop.rpc; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.service.CaculateService; /**  * @Date May 7, 2015  *  * @Author dengjie  *  * @Note RPC Client Main  */ public class RPCClient {  private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class);  public static void main(String[] args) {   InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT);   try {    RPC.getProtocolVersion(CaculateService.class);    CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class,      RPC.getProtocolVersion(CaculateService.class), addr, new Configuration());    int add = service.add(new IntWritable(2), new IntWritable(3)).get();    int sub = service.sub(new IntWritable(5), new IntWritable(2)).get();    LOGGER.info("2+3=" + add);    LOGGER.info("5-2=" + sub);   } catch (Exception ex) {    ex.printStackTrace();    LOGGER.error("Client has error,info is " + ex.getMessage());   }  } } 

Hadoop V2 RPC服务端截图预览,如下所示:

Hadoop2源码分析-RPC探索实战

Hadoop V2 RPC客户端截图预览,如下所示:

Hadoop2源码分析-RPC探索实战

7.总结

Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《 Hadoop2源码分析-序列化篇 》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。

8.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

正文到此结束
Loading...