作者:Jack47
PS:如果喜欢我写的文章,欢迎关注我的微信公众账号程序员杰克,两边的文章会同步,也可以添加我的RSS订阅源。
本文主要翻译自Storm官方文档 Guaranteeing message processing ,但我觉得官方文档写的有些随意,啰嗦,所以做了一些修改,里面的配图自己重新画了,能够更加贴切的表达意思。
Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要怎么做,才能充分利用Storm的可靠性。理解一些实现细节,也能够帮助我们领悟Storm的设计理念。
PS:本文用到了Storm的一些基本概念,例如Bolt,任务(Task),元组(Tuple),如果不清楚这些概念,可以参看我之前写的文章:Storm介绍(一),理解Storm并发。下文中元组(Tuple),跟消息(message)是等价的,Storm中处理的消息是用元组这种数据结构来表示的。
考虑如下的 流式计算文章中单词个数的拓扑 :
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme())); builder.setBolt("split", new SplitStentence(), 10).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20).fieldsGrouping("split", new Fields("word"));
这个拓扑由3个处理单元组成:一个叫"sentences"的Spout,负责从 Kestrel 队列中读取句子并作为新的Spout元组发送出去。名称为"split"的Bolt是Spout元组的下游消费方,它把接收到句子切分成单词并发送出去。名称为"count"的Bolt是"split" Bolt的下游消费方,它使用 HashMap<String, Interger>
存储了每个任务中每个单词出现的次数,每次读取到新的单词元组就让该单词的计数加一。"count" Bolt接收"split" Bolt发出的消息时,是使用元组中的"word"(单词)字段来作为路由策略,所以相同的单词元组会被路由到相同的任务(task)里,这样就能够计数了。
在下游的Bolt中会基于某个Spout元组发射出很多新的元组:句子中的每个单词会生成一个新元组(在split Bolt完成),每个单词的计数更新后(在count Bolt完成)也会触发一个新的元组。某个Spout元组触发的消息树如下图:
一个Spout元组触发的消息树
可以看到这棵消息树的根节点是Spout产生的句子内容为"the cow jumped over the moon"的元组。这个Spout元组在"split"这个Bolt里被切分为6个单词,触发了6个单词元组,"count" Bolt接收到这6个单词元组后,更新了每个单词的计数并为之产生了一个新的元组。
一条消息被“完整处理”
指一个从Spout发出的元组所触发的消息树中所有的消息都被Storm处理了。如果在指定的超时时间里,这个Spout元组触发的消息树中有任何一个消息没有处理完,就认为这个Spout元组处理失败了。这个超时时间是通过每个拓扑的 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 配置项来进行配置的,默认是30秒。
在前面消息树的例子里,只有消息树中所有的消息(包含一条Spout消息,六条split Bolt消息,六条count Bolt消息)都被Storm处理完了,才算是这条Spout消息被完整处理了。
当消息没有被完整处理或者处理失败了会怎么样?为了理解这个问题,应该首先看一下Spout发出的一个元组的生命周期。Spout需要实现的接口(接口文档 见这里 )如下:
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
首先,Storm通过调用Spout的 nextTuple
函数来从Spout请求一个元组。Spout任务使用 open
函数入参中提供的 SpoutOutputCollector
来给Spout任务的某个输出流发射一个新元组。当发射一个元组时, Spout
提供了一个"消息标识"(message-id),用来后续识别这个元组。例如,上面的例子里,sentence Spout从Kestrel队列中读取一条消息,然后把Kestrel提供的这个消息的message-id作为"消息标识"来发送出去。向 SpoutOutputCollector
中发送消息的例子如下:
_collector.emit(new Values("the cow jumped over the moon"), msgId);
接下来,元组就被发送到下游的Bolt进行消费,Storm会负责跟踪这个Spout元组创建的消息树。如果Storm检测到一个元组被完整地处理了,Storm会调用产生这个元组的Spout任务(Spout Bolt有多个任务来运行)的 ack
函数,参数是Spout之前发送这个消息时提供给Storm的message-id。类似的,当元组处理超时或处理失败时,Storm会在元组对应的Spout任务上调用 fail
函数,参数是之前Spout发送这个消息时提供给Storm的message-id。这样应用程序通过实现Spout Bolt中的 ack
接口和 fail
接口来处理消息处理成功和失败的情况。例如当消息处理成功时记录当前处理的进度,当处理失败时,重新发送消息来对这个消息进行重新处理。但在本文的例子里 fail
函数中不需要做任何处理,因为这些元组不会从Kestrel队列中去掉,下次从队列取消息,仍然会取到这些消息,只有处理成功后,才会从Kestrel队列中摘除这些消息。
作为Storm用户,如果想利用Storm的可靠性,需要做两件事:
1. 创建一个元组时(消息树上创建一个新节点)需要通知Storm 2. 处理完一个元组,需要通知Storm
通过这两个操作,当消息树被完全处理完,Storm就可以立即检测到,从而可以正确地确认这个Spout元组处理成功或者失败。Storm的API提供了一套简洁地处理这些操作的方法。
在Storm消息树(元组树)中添加一个子结点的操作叫做 锚定
(anchoring)。在应用程序发送一个新元组时候,Storm会在幕后做锚定。还是之前的流式计算单词个数的例子,请看如下的代码片段:
public class SplitSentence extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector){ _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
每个单词元组是通过把输入的元组作为 emit
函数中的第一个参数来做锚定的。通过锚定,Storm就能够得到元组之间的关联关系(输入元组触发了新的元组),继而构建出Spout元组触发的整个消息树。所以当下游处理失败时,就可以通知Spout当前消息树根节点的Spout元组处理失败,让Spout重新处理。相反,如果在 emit
的时候没有指定输入的元组,叫做 不锚定
:
_collector.emit(new Values(word));
这样发射单词元组,会导致这个元组 不被锚定(unanchored)
,这样Storm就不能得到这个元组的消息树,继而不能跟踪消息树是否被完整处理。这样下游处理失败,不能通知到上游的Spout任务。不同的应用的有不同的容错处理方式,有时候需要这样不锚定的场景。
一个输出的元组可以被锚定到多个输入元组上,叫做 多锚定(multi-anchoring)
。这在做流的合并或者聚合的时候非常有用。一个多锚定的元组处理失败,会导致Spout上重新处理对应的多个输入元组。多锚定是通过指定一个多个输入元组的列表而不是单个元组来完成的。例如:
List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(word));
多锚定会把这个新输出的元组添加到多棵消息树上。注意多锚定可能会打破消息的树形结构,变成有向无环图(DAG),Storm的实现既支持树形结构,也支持有向无环图(DAG)。在本文中,提到的消息树跟有向无环图是等价的。消息之间的关系是有向无环图的例子见下图:
消息形成的有向无环图
Spout元组A触发了B和C两个元组,而这两个元组作为输入,共同作用后触发D元组。
锚定
的作用就是指定元组树的结构--下一步是当元组树中某个元组已经处理完成时,通知Storm。通知是通过 OutputCollector
中的 ack
和 fail
函数来完成的。例如上面流式计算单词个数例子中的 split Bolt
的实现SplitSentence类,可以看到句子被切分成单词后,当所有的单词元组都被发射后,会确认(ack)输入的元组处理完成。
可以利用 OutputCollector
的 fail
函数来立即通知Storm,当前消息树的根元组处理失败了。例如,应用程序可能捕捉到了数据库客户端的一个异常,就显示地通知Storm输入元组处理失败。通过显示地通知Storm元组处理失败,这个Spout元组就不用等待超时而能更快地被重新处理。
Storm需要占用内存来跟踪每个元组,所以每个被处理的元组都必须被确认。因为如果不对每个元组进行确认,任务最终会耗光可用的内存。
做聚合或者合并操作的Bolt可能会延迟确认一个元组,直到根据一堆元组计算出了一个结果后,才会确认。聚合或者合并操作的Bolt,通常也会对他们的输出元组进行多锚定。
Storm 0.7.0引入了“事务拓扑”(transactional topologies)的特性,它让你在大多数场景下能够得到完全容错的只被处理一次的消息语义。更多关于事物拓扑的介绍见 这里
一个Storm拓扑有一组特殊的"acker"任务,它们负责跟踪由每个Spout元组触发的消息的处理状态。当一个"acker"看到一个Spout元组产生的有向无环图中的消息被完全处理,就通知当初创建这个Spout元组的Spout任务,这个元组被成功处理。可以通过拓扑配置项 Config.TOPOLOGY_ACKER_EXECUTORS 来设置一个拓扑中acker任务 executor
的数量。Storm默认 TOPOLOGY_ACKER_EXECUTORS
和拓扑中配置的Worker的数量相同(关于executor和Worker的介绍,参见理解Storm并发一文)--对于需要处理大量消息的拓扑来说,需要增大acker executor的数量。
理解Storm的可靠性实现方式的最好方法是查看元组的生命周期和元组构成的有向无环图。当拓扑的Spout或者Bolt中创建一个元组时,都会被赋予一个随机的64比特的标识(message id)。acker任务使用这些id来跟踪每个Spout元组产生的有向无环图的处理状态。在Bolt中产生一个新的元组时,会从锚定的一个或多个输入元组中拷贝所有Spout元组的message-id,所以每个元组都携带了自己所在元组树的根节点Spout元组的message-id。当确认一个元组处理成功了,Storm就会给对应的acker任务发送特定的消息--通知acker当前这个Spout元组产生的消息树中某个消息处理完了,而且这个特定消息在消息树中又产生了一个新消息(新消息锚定的输入是这个特定的消息)。
举个例子,假设"D"元组和"E"元组是基于“C”元组产生的,那么下图描述了确认“C”元组成功处理后,元组树的变化。图中虚线框表示的元组代表已经在消息树上被删除了:
确认元组成功处理后消息树的变化
由于在“C”从消息树中删除(通过acker函数确认成功处理)的同时,“D”和“E”也被添加到(通过emit函数来锚定的)元组树中,所以这棵树从来不会被提早处理完。
正如上面已经提到的,在一个拓扑中,可以有任意数量的acker任务。这导致了如下的两个问题:
Storm采用对元组中携带的Spout元组message-id哈希取模的方法来把一个元组映射到一个acker任务上(所以同一个消息树里的所有消息都会映射到同一个acker任务)。因为每个元组携带了自己所处的元组树中根节点Spout元组(可能有多个)的标识,所以Storm就能决定通知哪个acker任务。
当一个Spout任务产出一个新的元组,仅需要简单的发送一个消息给对应的acker(Spout元组message-id哈希取模)来告知Spout的任务标示(task id),以此来通知acker当前这个Spout任务负责这个消息。当acker看到一个消息树被完全处理完,它就能根据处理的元组中携带的Spout元组message-id来确定产生这个Spout元组的task id,然后通知这个Spout任务消息树处理完成(调用 Spout任务的 ack
函数)。
对于拥有上万节点(或者更多)的巨大的元组树,跟踪所有的元组树会耗尽acker使用的内存。acker任务不显示地(记录完整的树型结构)跟踪元组树,相反它使用了一种每个Spout元组只占用固定大小空间(大约20字节)的策略。这个跟踪算法是Storm工作的关键,而且是重大突破之一。
一个acker任务存储了从一个Spout元组message-id到一对值的映射关系 spout-message-id--><spout-task-id, ack-val>
。第一个值是创建了这个Spout元组的任务id,用来后续处理完成时通知到这个Spout任务。第二个值是一个64比特的叫做“ack val”的数值。它是简单的把消息树中所有被创建或者被确认的元组message-id异或起来的值。每个消息创建和被确认处理后都会异或到"ack val"上, A xor A = 0
,所以当一个“ack val”变成了0,说明整个元组树都完全被处理了。无论是很大的还是很小的元组树,"ack val"值都代表了整个元组树中消息的处理状态。由于元组message-id是随机的64比特的整数,所以同一个元组树中不同元组message-id发生撞车的可能性特别小,因此“ack val”意外的变成0的可能性非常小。如果真的发生了这种情况,而恰好这个元组也处理失败了,那仅仅会导致这个元组的数据丢失。
使用异或操作来跟踪消息树处理状态的想法非常有才。因为消息的数量可能有成千上万条,每个都单独跟踪(读者可以思考下怎么搞)是非常低效而且不可水平扩展的。而且采用异或的方式后,就不依赖于acker接收到消息的顺序了。
搞明白了可靠性的算法,让我们看看所有失败的场景下Storm如何避免数据丢失:
acker任务是轻量级的,所以在一个拓扑中不需要太多的acker任务。可以通过Storm UI(id为"__acker"的组件)来观察acker任务的性能。如果吞吐量看起来不正常,就需要添加更多的acker任务。
如果可靠性无关紧要--例如你不关心元组失败场景下的消息丢失--那么你可以通过不跟踪元组的处理过程来提高性能。不跟踪一个元组树会让传递的消息数量减半,因为正常情况下,元组树中的每个元组都会有一个确认消息。另外,这也能减少每个元组需要存储的id的数量(指每个元组存储的Spout message-id),减少了带宽的使用。
有三种方法来去掉可靠性:
Config.TOPOLOGY_ACKERS
为0。这种情况下,Storm会在Spout吐出一个元组后立马调用Spout的 ack
函数。这个元组树不会被跟踪。 emit
函数的时候通过忽略消息message-id参数来关闭这个元组的跟踪机制。 emit
的时候不要使用锚定。由于它们没有被锚定到某个Spout元组上,所以当它们没有被成功处理,不会导致Spout元组处理失败。 Guaranteeing message processing
twitter-storm如何保证消息不丢失/
Fault Tolerant Message Processing in Storm