本文算是个人对Storm应用和学习的一个总结,由于不太懂Clojure语言,所以无法更多地从源码分析,但是参考了官网、好多朋友的文章,以及《Storm Applied: Strategies for real-time event processing》这本书,以及结合自己使用Storm的经历,希望对于想深入一点了解Storm原理的朋友能有所帮助,有不足之处欢迎拍砖交流。
Storm集群采用主从架构方式,主节点是Nimbus,从节点是Supervisor,有关调度相关的信息存储到ZooKeeper集群中,架构如下图所示:
具体描述,如下所示:
Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。
Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。
用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。
Storm中最重要的抽象,应该就是Stream grouping了,它能够控制Spot/Bolt对应的Task以什么样的方式来分发Tuple,将Tuple发射到目的Spot/Bolt对应的Task,如下图所示:
目前,Storm Streaming Grouping支持如下几种类型:
另外,Storm还提供了用户自定义Streaming Grouping接口,如果上述Streaming Grouping都无法满足实际业务需求,也可以自己实现,只需要实现backtype.storm.grouping.CustomStreamGrouping接口,该接口重定义了如下方法:
List<Integer> chooseTasks(int taskId, List<Object> values)
上面几种Streaming Group的内置实现中,最常用的应该是Shuffle Grouping、Fields Grouping、Direct Grouping这三种,使用其它的也能满足特定的应用需求。
首先,我们理解一下Tuple Tree的概念,要计算英文句子中每个字母出现的次数,形成的Tuple Tree如下图所示:
对上述这个例子,也就是说,运行时每一个英文句子都会对应一个Tuple Tree,一个Tuple Tree可能很大,也可能很小,与具体的业务需求有关。
另外,Acker也是一个Bolt组件,只不过我们实现处理自己业务逻辑时,不需要关心Acker Bolt的实现,在提交实现的Topology到Storm集群后,会在初始化Topology时系统自动为我们的Topology增加Acker这个Bolt组件,它的主要功能是负责跟踪我们自己实现的Topology中各个Spout/Bolt所处理的Tuple之间的关系(或者可以说是,跟踪Tuple Tree的处理进度)。
下面,我们描述一下Acker的机制,如下所示:
我们编写的处理业务逻辑的Topology提交到Storm集群后,就会发生任务的调度和资源的分配,从而也会基于Storm的设计,出现各种各样的组件。我们先看一下,Topology提交到Storm集群后的运行时部署分布图,如下图所示:
通过上图我们可以看出,一个Topology的Spout/Bolt对应的多个Task可能分布在多个Supervisor的多个Worker内部。而每个Worker内部又存在多个Executor,根据实际对Topology的配置在运行时进行计算并分配。
从运行Topology的Supervisor节点,到最终的Task运行时对象,我们大概需要了解Storm抽象出来的一些概念,由于相对容易,我简单说明一下:
上面都是一些表达静态事物的概念,我们编写完成一个Topology之后,上面的组件都已静态地方式存在。下面,我们看一下提交Topology运行以后,会产生那些动态的组件(概念):
有关Topology的并行度的计算,官网上有一篇文章介绍(后面参考链接中已附上),我们这里详细解释一下,对于理解Storm UI上面的一些统计数据也会有很大帮助。在编写代码设置并行度的时候,只是一个提示信息,Storm会根据这个信息去计算运行时的并行度,这个并行度实际上是对组成一个Topology的每个Spout/Bolt的运行时表现Task的分布情况,所以我们可能会想关注从一个Topology的角度去看,这些设置了并行度的Spout/Bolt对应的运行时Task,在集群的多个Worker进程之间是如何分布的。
下面是例子给出的Topology的设计,如下图所示:
对该例子Topology配置了2个Worker,对应的代码示例如下所示:
conf.setNumWorkers(2); // 该Topology运行在Supervisor节点的2个Worker进程中 topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置并行度为2,则Task个数为2*1 topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout"); // 设置并行度为2,设置Task个数为4 ,则Task个数为4 topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6) .shuffleGrouping("green-bolt"); // 设置并行度为6,则Task个数为6*1
那么,下面我们看Storm是如何计算一个Topology运行时的并行度,并分配到2个Worker中的:
从直观上看,其实分配Task到多个Executor中的分配结果有很多种,都能满足尽量让同类型Task在相同的Executor中,有关Storm的计算实现可以参考源码。
上述例子Topology在运行时,多个Task分配到集群中运行分布的结果,如下图所示:
一个Topology提交到Storm集群上运行,具体的处理流程非常微妙,有点复杂。首先,我们通过要点的方式来概要地说明:
上面,很多地方我使用了“可能”,实际上大部分情况下是这样的,注意了解即可。下面,我们根据Spout Task/Bolt Task运行时分布的不同情况,分别阐述如下:
Spout Task和Bolt Task运行时在Executor中运行有一点不同,如果Spout Task所在的同一个Executor中没有Bolt Task,则该Executor中只有一个Outgoing Queue用来存放将要传输到Bolt Task的队列,因为Spout Task需要从一个给定的数据源连续不断地读入数据而形成流。在一个Executor中,Spout Task及其相关组件的执行流程,如下图所示:
上图所描述的数据流处理流程,如下所示:
前面说过,Bolt Task运行时在Executor中与Spout Task有一点不同,一个Bolt Task所在的Executor中有Incoming Queue和Outgoing Queue这两个队列,Incoming Queue用来存放数据流处理方向上,该Bolt Task上游的组件(可能是一个或多个Spout Task/Bolt Task)发射过来的Tuple数据,Outgoing Queue用来存放将要传输到下游Bolt Task的队列。如果该Bolt Task是数据流处理方向上最后一个组件,而且对应execute()方法没有再进行emit()创建的Tupe数据,那么该Bolt Task就没有Outgoing Queue这个队列。在一个Executor中,一个Bolt Task用来衔接上游(Spout Task/Bolt Task)和下游(Bolt/Task)的组件,在该Bolt Task所在的Executor内其相关组件的执行流程,如下图所示:
上图所描述的数据流处理流程,如下所示:
在同一个Worker JVM实例内部,可能创建多个Executor实例,那么我们了解一下,一个Tuple是如何在两个Task之间传输的,可能存在4种情况,在同一个Executor中的情况有如下2种:
我们后面会对类似这种情况详细说明,下面给出的是,2个不同的Executor中Task运行的情况,分别如下图所示:
通过前面了解一个Spout Task和一个Bolt Task运行的过程,对上面两种情况便很好理解,不再累述。
如果是在不同的Worker进程内,也就是在两个隔离的JVM实例上,无论是否在同一个Supervisor节点上,Tuple的传输的逻辑是统一的。这里,以一个Spout Task和一个Bolt Task分别运行在两个Worker进程内部为例,如下图所示:
处理流程和前面的类似,只不过如果两个Worker进程分别在两个Supervisor节点上,这里Transfer Thread传输Tuple走的是网络,而不是本地。
下面,我们关心每一个Tuple是如何在各个Bolt的各个Task之间传输,如何将一个Tuple路由(Routing)到下游Bolt的多个Task呢?这里,我们应该了解一下,作为在Task之间传输的消息的表示形式,定义TaskMessage类的代码,如下所示:
package backtype.storm.messaging; import java.nio.ByteBuffer; public class TaskMessage { private int _task; private byte[] _message; public TaskMessage(int task, byte[] message) { _task = task; _message = message; } public int task() { return _task; } public byte[] message() { return _message; } public ByteBuffer serialize() { ByteBuffer bb = ByteBuffer.allocate(_message.length + 2); bb.putShort((short) _task); bb.put(_message); return bb; } public void deserialize(ByteBuffer packet) { if (packet == null) return; _task = packet.getShort(); _message = new byte[packet.limit() - 2]; packet.get(_message); } }
可见,每一个Task都给定一个Topology内部唯一的编号,就能够将任意一个Tuple正确地路由到下游需要处理该Tuple的Bolt Task。
假设,存在一个Topology,包含3个Bolt,分别为Bolt1、Bolt2、Bolt3,他们之间的关系也是按照编号的顺序设置的,其中Bolt1有个2个Task,Bolt2有2个Task,Bolt3有2个Task,这里我们只关心Bolt1 Task到Bolt2 Task之间的数据流。具体的路由过程,如下图所示:
上图中,Bolt2的两个Task分布到两个Worker进程之内,所以,当上游的Bolt1的2个Task处理完输入的Tuple并生成新的Tuple后,会有根据Bolt2的Task的编号,分别进行如下处理:
通过上面处理流程可以看出,每个Executor应该维护Task与所在的Executor之间的关系,这样才能正确地将Tuple传输到目的Bolt Task进行处理。
本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。