用实例理解Storm的Stream概念
事情源于在看基于Storm的CEP引擎: flowmix
的 FlowmixBuilder 代码,
每个Bolt设置了这么多的 Group ,
而且 declareStream 也声明了这么多的stream-id,
对于只写过 WordCountTopology 的小白而言,
直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Group,这TMD拓扑图是什么样的?
public TopologyBuilder create() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
builder.setSpout("tick", new TickSpout(1000), 1);
builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint) // kicks off a flow determining where to start
.localOrShuffleGrouping(EVENT)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);
declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);
return builder;
}
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, boltName)
.localOrShuffleGrouping(FILTER, boltName)
.fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION)) // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
.localOrShuffleGrouping(AGGREGATE, boltName)
.localOrShuffleGrouping(SELECT, boltName)
.localOrShuffleGrouping(EACH, boltName)
.localOrShuffleGrouping(SORT, boltName)
.localOrShuffleGrouping(SWITCH, boltName)
.localOrShuffleGrouping(SPLIT, boltName)
.localOrShuffleGrouping(JOIN, boltName);
}
public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
declarer.declareStream(PARTITION, fields);
declarer.declareStream(FILTER, fields);
declarer.declareStream(SELECT, fields);
declarer.declareStream(AGGREGATE, fields);
declarer.declareStream(SWITCH, fields);
declarer.declareStream(SORT, fields);
declarer.declareStream(JOIN, fields);
declarer.declareStream(SPLIT, fields);
declarer.declareStream(EACH, fields);
declarer.declareStream(OUTPUT, fields);
}
先来复习下经典的WordCountTopology
public class WordCountTopologySimple {
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector collector;
Random rand;
String[] sentences = null;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
}
@Override
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("/n" + sentence);
this.collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void ack(Object id) {}
public void fail(Object id) {}
}
public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCountBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static class PrinterBolt extends BaseBasicBolt {
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是 default
。
可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。
代码说明:发射时指定 一个 stream-id,声明流时指定 一个 stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id
class RandomSentenceSpout {
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("/n" + sentence);
this.collector.emit("split-stream", new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit("count-stream", new Values(word));
}
this.collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("count-stream", new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit("print-stream", new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("print-stream", new Fields("word", "count"));
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}
使用自定义stream-id,主要分成两个步骤:
下图示例细说明了拓扑图中各个组件是怎么协调工作的:
Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。
public void execute(Tuple input) {
String word = input.getString(0);
//小于j的word发送给stream1; 大于j的word发送给stream2;
if(word.compareTo("j") < 0){
collector.emit("stream1", new Values(word));
}else if(word.compareTo("j") > 0){
collector.emit("stream2", new Values(word));
}
//不管什么都发送给stream3
collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("stream1", new Fields("word"));
outputFieldsDeclarer.declareStream("stream2", new Fields("word"));
outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}
程序员都喜欢流程图,喏,下图左上角第一个就是了,右上角是对应到Storm中的Topology,下面两图示例了两条消息在Storm的消息流的走向。
仿照上面的示例,对WordCountTopology的Spout/Bolt的发射方法都指定一个输出的stream-id,同时在declareOutputFields声明多个输出的stream-id。
现在虽然Spout/Bolt声明了多个输出stream-id,但是 emit时还是只发射到一个stream-id 中。
所以本质上和前面的SingleStream是一样的,所以Topology不需要做任何改动也还是可以运行的。
代码说明:发射时指定 一个 stream-id,声明流时指定 多个 stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id
emit不变,topology不变
class RandomSentenceSpout {
public void nextTuple() {
this.collector.emit("split-stream", new Values(sentence)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("sentence")); //⬅
declarer.declareStream("count-stream", new Fields("sentence"));
declarer.declareStream("print-stream", new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
for (String word : words) {
this.collector.emit("count-stream", new Values(word)); //⬅
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("word"));
declarer.declareStream("count-stream", new Fields("word")); //⬅
declarer.declareStream("print-stream", new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
collector.emit("print-stream", new Values(word, count)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("word", "count"));
declarer.declareStream("count-stream", new Fields("word", "count"));
declarer.declareStream("print-stream", new Fields("word", "count")); //⬅
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}
那么我们为什么还要在Spout/Bolt中定义多个输出流呢?观察这部分代码,stream-id都是一样的,不同的是Fields部分,如果将每个Spout/Bolt的多个declarer.declareStream抽取出来:
public static void declareStream(OutputFieldsDeclarer declarer,
Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
}
然后在Spout/Bolt的declareOutputFields调用declareStream方法一次声明所有的stream-id,只需要传递不同的Fields即可。
代码说明:声明多个stream时,每个组件的所有stream-id都一样,传入不同的Fieldsemit不变,topology不变
class RandomSentenceSpout {
public void nextTuple() {
this.collector.emit("split-stream", new Values(sentence)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
for (String word : words) {
this.collector.emit("count-stream", new Values(word)); //⬅
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
collector.emit("print-stream", new Values(word, count)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word", "count"));
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2)
.fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
}
这样的好处是,如果事先知道所有的stream-id,只需要定义好declareStream,每个bolt都调用这个全局的方法即可。
实际上这种方式对于构建 动态拓扑图 是很有用的。
通过把所有stream-id封装到一个方法中,而emit时只指定一个stream-id。
现在每个组件emit时只指定了一个stream-id,声明输出流时都指定了相同的stream-id集合。
也就是说Spout/Bolt中虽然声明了多个stream-id,但是一条消息只会选择一个stream-id。
那么可不可以对Group方式运用同样的方式呢,我们的目的是想要把setBolt这种逻辑也抽取出一个共同的方法。下面这种方式肯定是不对的,首先无法抽取,因为每个Bolt的Group分组策略不同。
虽然是错误的,但是我们并没有对首尾组件用多个Group,这是为什么呢?
1.Spout没有所谓的分组,因为Spout就是源头,分组时指定component指的是当前component的数据源自这个指定的component
2.最后一个Bolt我们先不设置,这里有坑…
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.shuffleGrouping("split", "split-stream")
.shuffleGrouping("count", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.fieldsGrouping("spout", "count-stream", new Fields("word"))
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.fieldsGrouping("count", "count-stream", new Fields("word"))
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
而且也无法构建拓扑图,比如WordCountBolt的输入component=”spout”时,
在拓扑图中这个组件是 RandomSentenceSpout
,它的输出字段名称为”sentence”,根本就没有word这个字段。
下面的错误也证实了这一点: Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})
count
这个组件(即WordCountBolt)订阅了 spout
组件(即RandomSentenceSpout)的 count-stream
输出流,但是spout组件并不存在 word
字段。
6972 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:
Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})>
7002 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
正确使用多个stream-id的姿势:
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
现在每个Bolt的Group方式都是一样的了,并且component-id也是一样的,只有最后的stream-id不同。很好,可以像抽取declareStream那样抽取setBolt了:
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new RandomSentenceSpout(),1);
setBolt(builder, new SplitSentenceBolt(), "split");
setBolt(builder, new WordCountBolt(), "count");
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
public static void setBolt(TopologyBuilder builder,IRichBolt bolt,String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
;
}
每个Bolt都设置了多种分组策略,而分组的第一个参数component表示数据源自哪里,
现在SplitSentenceBolt和WordCountBolt都定义了三种分组策略,
那么是不是说[split]的数据源有:[spout],[split],[count],
同样[count]的数据源也有:[spout],[split],[count],这跟实际的Topology结构就完全不一样了。
可以看到下图的拓扑结构比原先的WordCountTopology多了几条线(而且还能自己指向自己我也是醉了)。
不过虽然每个Bolt都有多个输入源,但是输入源组件不一定有指定的stream-id。
比如split的数据源虽然有三个[spout],[split],[count],但是这三个组件中stream-id=”split-stream”的组件
只有[spout],因此即使设置了三个数据源,另外两个数据源是无效的。
同样[count]的数据源虽然也有三个[spout],[split],[count],但是这三个组件中stream-id=”count-stream”的组件也只有[split]才有。
所以最后实际上拓扑图还是最原始的[spout]->[split]->[count]->[print],并不会出现之前出现的多条线以及自己指向自己的情况。
可以把最后一个PrintBolt也都加到每个Bolt的分组策略里吗?
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
.shuffleGrouping("print", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
.shuffleGrouping("print", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("spout", "print-stream")
.fieldsGrouping("split", "print-stream", new Fields("word"))
.shuffleGrouping("count", "print-stream") //:arrow_left:
.shuffleGrouping("print", "print-stream")
;
拓扑图是这样的,虚线表示实际上是不存在的(因为输入源本身没有发射到这些stream)。
Opps….报错显示:[count]组件订阅了[print]组件中一个不存在的[count-stream]
9510 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:Component:
[count] subscribes from non-existent stream: [count-stream] of component [print])>
9552 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
下面修改不同Bolt中和Print相关的分组方式,只有把Print全部注释掉才可以
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
//.shuffleGrouping("print", "split-stream") //②
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
//.shuffleGrouping("print", "count-stream") //①
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("spout", "print-stream")
.fieldsGrouping("split", "print-stream", new Fields("word"))
.shuffleGrouping("count", "print-stream") //⬅
//.shuffleGrouping("print", "print-stream") //③
;
发生了什么事?不存在stream为什么就不行?可是前面以SplitSentenceBolt为例,split和count也不存在split-stream啊,为什么就不会报错呢?原因在于我们的PrintBolt只是打印数据,然后什么都不做,它没有emit出任何消息,也就没有emit消息到任何消息流,所以下图中从PrintBolt出来的线根本就不存在!
怎么办呢,很简单,给PrintBolt添加一个带有stream-id的emit,同时也要在declareOutputFields中声明这个输出流。只要PrintBolt有输出流,就不会报错了。也就是确保每个Bolt都会往下发送消息
最终完整的代码如下:
public class WordCountTopologyStream3 {
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector collector;
Random rand;
String[] sentences = null;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
}
@Override
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("/n" + sentence);
this.collector.emit("split-stream", new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("sentence"));
}
public void ack(Object id) {}
public void fail(Object id) {}
}
public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit("count-stream", new Values(word));
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word"));
}
}
public static class WordCountBolt extends BaseRichBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit("print-stream", new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word", "count"));
}
}
public static class PrinterBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
collector.emit("whatever-stream", new Values(first + ":" + second)); //⬅
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word:count")); //⬅
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
setBolt(builder, new SplitSentenceBolt(), "split");
setBolt(builder, new WordCountBolt(), "count");
setBolt(builder, new PrinterBolt(), "print");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
declarer.declareStream("whatever-stream", fields); //⬅
}
public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
.shuffleGrouping("print", name + "-stream") //⬅
;
}
}
你以为这样就完了吗,如果把PrintBolt的输出stream-id去掉,即采用默认的default的话:
public static class PrinterBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
//collector.emit("whatever-stream", new Values(first + ":" + second));
collector.emit(new Values(first + ":" + second));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//declareStream(declarer, new Fields("word:count"));
declarer.declare(new Fields("word:count"));
}
}
public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
//declarer.declareStream("whatever-stream", fields); //⬅
}
public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
.shuffleGrouping("print", name + "-stream")
;
}
还是报错:[count]组件订阅了[print]组件中不存在的[count-stream]
Component: [count] subscribes from non-existent stream: [count-stream] of component [print]
好吧,看来前面的组件都使用自定义的stream-id,最后一个组件也必须使用自定义的stream-id,即使这个stream-id看起来没什么意义!
EOF.