本文展示一个Storm的topology,该topology对给定的词源进行词频统计,然后存入HBase,该实例不借助storm-hbase包,而是直接使用hbase client来完成对HBase的操作。
由Twitter开源的、分布式实时计算系统 Apache Storm ,如今已被多家知名企业应用于实时分析、流式计算、在线机器学习、分布式RPC调用、ETL等领域,甚至有看到“Storm之于实时计算,就像Hadoop之于数据批处理”这样的评价,是否言过其实,这里暂且不论,但至少已经看到业界对Storm在实时计算领域的肯定,加之其开源特性,必然会得到更广泛的应用。
在Storm的实际应用中,在topology中将经过处理的数据通过 HBase 进行持久化,是一个常见的需求。Storm官方提供了storm-hbase,包含一些比较通用的API及其简单实现,可以查看对应的官方文档来了解基本使用方法: storm-hbase 。但如果你需要进行一些更复杂的处理,或者希望对自己的代码有更多的掌控,那么脱离storm-hbase,直接使用HBase的Java API来完成操作,将是一个不错的选择。本文将展示的,就是一个在Storm的topology中直接使用HBase Java API操作HBase的简单示例。
本项目数据源部分直接借用Storm词频统计的官方示例,在WordSpout.java中从静态字符串数组中读取单词,在WordCounterBolt.java中统计单词出现的次数,最后在MyHBaseBolt.java中将单词及其出现的次数写入到HBase。
示例的测试环境:
示例直接使用hbase client操作HBase,因此关键的依赖只有storm和hbase client,项目pom.xml:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <storm.version>1.0.1</storm.version> <!-- 开发调试时配置为compile,topology打包时配置为provided --> <storm.scope>compile</storm.scope> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${storm.scope}</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.2</version> </dependency> </dependencies>
项目结构:
--src --main --java --bolt --MyHBaseBolt.java --WordCounterBolt.java --spout --WordSpout.java --HBaseTopology.java --resources --hbase-site.xml
其中hbase-site.xml直接使用HBase服务器上面的hbase-site.xml即可。本示例的HBase集群使用独立的zookeeper集群,zk的端口使用了默认端口,因此不需要在hbase-site.xml中显式配置,详细内容见附录。
这部分直接借用一个Storm官方示例:WordSpout.java从静态数组中随机读取单词并向外发射,WordCounterBolt接收来自WordSpout的包含一个个单词的tuple,对每个单词出现的次数进行统计,然后将每个单词及其对应的计数向外发射。为快速进入主题,这部分代码放在附录中。
在java中通过hbase client对hbase进行读写大体有如下步骤:
Configuration config = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("WordCount"));
Put p = new Put(Bytes.toBytes("key"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes("word"));
在Storm的bolt中进行实际应用:
public class MyHBaseBolt extends BaseBasicBolt { private Connection connection; private Table table; @Override public void prepare(Map stormConf, TopologyContext context) { Configuration config = HBaseConfiguration.create(); try { connection = ConnectionFactory.createConnection(config); //示例都是对同一个table进行操作,因此直接将Table对象的创建放在了prepare,在bolt执行过程中可以直接重用。 table = connection.getTable(TableName.valueOf("WordCount")); } catch (IOException e) { //do something to handle exception } } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //从tuple中获取单词 String word = tuple.getString(0); //从tuple中获取计数,这里转换为String只是为了示例运行后存入hbase的计数值能够直观显示。 String count = tuple.getInteger(1).toString(); try { //以各个单词作为row key Put put = new Put(Bytes.toBytes(word)); //将被计数的单词写入cf:words列 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes(word)); //将单词的计数写入cf:counts列 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("counts"), Bytes.toBytes(count)); table.put(put); } catch (IOException e) { //do something to handle exception } } @Override public void cleanup() { //关闭table try { if(table != null) table.close(); } catch (Exception e){ //do something to handle exception } finally { //在finally中关闭connection try { connection.close(); } catch (IOException e) { //do something to handle exception } } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //示例中本bolt不向外发射数据,所以没有再做声明 } }
虽然可能应用场景相对较少,但还是附带介绍一下从HBase读取数据:
Get get = new Get(Bytes.toBytes("key"));
Result r = table.get(get);
byte[] value = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("words"));
也可以使用scan来批量读取,Scanner实现了Iterable,因此可以使用foreach来进行遍历:
Scan scan = new Scan(); //获取指定列族所有列的数据 scan.addFamily(Bytes.toBytes("cf")); ResultScanner scanner = table.getScanner(scan); try { for (Result r : scanner) {...} }finally{ scanner.close(); }
topology中唯一需要注意的是,在Windows测试该示例时,需要配置hadoop.home.dir属性,并确保将winutils.exe客户端( 示例中使用的版本(链接若失效请自助) )放置在所配置的hadoop.home.dir目录下(资料解释:在hadoop 2.x版本的包中不再包含winutils.exe文件)。
HBaseTopology.java:
public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; private static final String HBASE_BOLT = "HBASE_BOLT"; public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir","E:/BaiduYunDownload"); Config config = new Config(); WordSpout spout = new WordSpout(); WordCounter bolt = new WordCounter(); MyHBaseBolt hbase = new MyHBaseBolt(); // wordSpout ==> countBolt ==> HBaseBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); builder.setBolt(HBASE_BOLT, hbase, 10).fieldsGrouping(COUNT_BOLT, new Fields("word")); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word", config, builder.createTopology()); Thread.sleep(10000); cluster.killTopology("word"); cluster.shutdown(); System.exit(0); } else { config.setNumWorkers(3); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } }
如果编译遇到类似: java.io.IOException: No FileSystem for scheme: hdfs
这样关于hadoop的问题,可能需要添加hadoop相关依赖包,如:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.4</version> </dependency>
本文通过一个词频统计后通过HBase进行结果持久化的topology示例,展示了如何在Storm的中直接使用HBase的java api来实现基本的读写操作,希望能为想自己完成Storm的HBase集成而不得其法的朋友提供一个入门指引。
WordSpout.java:
public class WordSpout extends BaseRichSpout { private SpoutOutputCollector collector; private static final String[] MSGS = new String[]{ "Storm", "HBase", "Integration", "example", "by ", "aloo", "in", "Aug", }; private static final Random random = new Random(); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { String word = MSGS[random.nextInt(8)]; collector.emit(new Values(word)); } }
WordCounterBolt.java:
public class WordCounter extends BaseBasicBolt { private Map<String, Integer> _counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); int count; if(_counts.containsKey(word)){ count = _counts.get(word); } else { count = 0; } count ++; _counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://xxx.xx.xx.xx:9000/hbase</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/home/hadoop/hbase/storm/zookeeper</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>zknode1,zdnode2,zknode3</value> </configuration>