通过HA访问Hdfs的时候如何获取到活跃节点是一个稍稍有些麻烦的事情。
目前使用过两种方案:一是通过webhdfs接口逐一访问测试,找到状态为可用的节点;一是在zookeeper上直接获取当前活跃的节点。
简单说下第二种方案。ha的ActiveNode在zookeeper上的存储节点为:/hadoop-ha/dcnameservice/ActiveStandbyElectorLock。只需要通过ZooKeeper的API监听获取这个节点的信息即可。不过这个节点保存的信息不能当做字符串来读取,它是一个序列化后的对象,需要反序列化才能使用。
实现的代码如下:
import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; public class ZooKeeperClient { private static ZooKeeperzk; private static Statstat = new Stat(); private static String HA_ACTIVE_NODE = ""; static { try { zk = new ZooKeeper(HDFS_HA_ZK_ADDRESS, 50000, new Watcher() { @Override public void process(WatchedEventevent) { if (event.getType() == NodeDataChanged) { update(); System.out.println("HA Address changed......"); } } }); } catch (IOException e) { throw new RuntimeException(e); } update(); } private static void update() { try { byte[] bytes = zk.getData(HDFS_HA_ZK_NODE, true, stat); HAZKInfoProtos.ActiveNodeInfonodeInfo = HAZKInfoProtos.ActiveNodeInfo.parseFrom(bytes); HA_ACTIVE_NODE = nodeInfo.getHostname(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } public static String getActiveNode() { return HA_ACTIVE_NODE; } public static void main(String[] args) { System.out.println(getActiveNode()); } }
就这样。