转载

Storm介绍

作者:Jack47

如果喜欢我写的文章,欢迎关注我的微信公众账号程序员杰克,两边的文章会同步,也可以添加我的RSS订阅源。

PS:吐槽一下,感觉博客园的mardown渲染很不好看啊,标题字体不够大,让人看着不舒服。有人有好的解决办法吗?

之前的技术文章都写的有点一板一眼,太正经了。今天在文章正式开始前,跟大家八卦一下Storm的作者 Nathan Marz 吧。

Storm的作者是 Nathan Marz ,Nathan Marz在 BackType 公司工作的时候有了Storm的点子并独自一人实现了Storm。在2011年Twitter准备收购BackType之际,Nathan Marz为了提高Twitter对BackType的估值,在一篇 博客 里向外界介绍了Storm。Twitter对这项技术非常感兴趣,因此在Twitter收购BackType的时候Storm发挥了重大作用。后来Nathan Marz开源Storm时,也借着Twitter的品牌影响力而让Storm名声大震!

Storm的特点之一是可靠的消息处理机制,这个机制中最重要的一环是设计一个算法来跟踪Storm中处理的数据,确保Storm知道消息是否被完整的处理。他创造出的这个算法,极大的简化了系统的设计。Nathan Marz说这算法是他职业生涯中开发的最出色的算法之一,也说明了受过良好的计算机科学的教育是非常重要的。有趣的是发明这个算法的那天,正好是他和不久前遇到的一个姑娘约会的日子。当天因为发明了这个算法而非常兴奋,导致他心思一直在这个算法上,毫无疑问就搞砸了和这个姑娘的约会!

Storm是什么

Storm官方网站 有段简介

Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

在Storm之前,进行实时处理是非常痛苦的事情: 需要维护一堆消息队列和消费者,他们构成了非常复杂的图结构。消费者进程从队列里取消息,处理完成后,去更新数据库,或者给其他队列发新消息。

这样进行实时处理是非常痛苦的。我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了源代码的一小部分。一个应用程序的逻辑运行在很多worker上,但这些worker需要各自单独部署,还需要部署消息队列。最大问题是系统很脆弱,而且不是容错的:需要自己保证消息队列和worker进程工作正常。

Storm完整地解决了这些问题。它抽象了消息传递,会自动地在集群机器上并发地处理流式计算,让你专注于实时处理的业务逻辑。

Storm的应用

跟Hadoop不一样,Storm是没有包括任何存储概念的计算系统。这就让Storm可以用在多种不同的场景下:非传统场景下数据动态到达或者数据存储在数据库这样的存储系统里(或数据是被实时操控其他设备的控制器(如交易系统)所消费)

Storm有很多应用:实时分析,在线机器学习(online machine learning),连续计算(continuous computation),分布式RPC、ETL等。Storm处理速度很快:每个节点每秒钟可以处理超过百万的数据组。它是可扩展(scalable),容错(fault-tolerant),保证你的数据会被处理,并且很容易搭建和操作。

例如Nathan Marz提供的 例子 ,产生Twitter的趋势信息。Twitter从海量推文中抽取趋势信息,并在本地区域和国家层级进行维护。这意味者一旦一个案例开始出现,Twitter的话题趋势算法就能实时的鉴别出这个话题。这个实时的算法就是通过在Storm上连续分析Twitter数据来实现的。

其他开源的大数据解决方案

下表列出了一组开源的大数据解决方案,包括传统的批处理和流式处理的应用程序。

解决方案 开发者 类型 描述
Storm Twitter 流式处理 Twitter的流式处理大数据分析方案
S4 Yahoo! 流式处理 Yahoo!的分布式流式计算平台
Hadoop Apache 批处理 MapReduce范式的第一个开源实现
Spark UC Berkeley AMPLab 批处理 支持内存数据集和弹性恢复的分析平台

Yahoo! S4和Storm之间的关键差别是Storm在故障的情况下可以保证消息的处理,而S4可能会丢消息。

Hadoop无疑是大数据分析的王者,本质上是一个批量处理系统,它专注于大数据的批量处理。数据存储在Hadoop 文件系统里(HDFS)并在处理的时候分发到集群中的各个节点。当处理完成,产出的数据放回到HDFS上。在Storm上构建的拓扑处理的是持续不断的流式数据。不同于Hadoop的任务,这些处理过程不会终止,会持续处理到达的数据。

Hadoop处理的是静态的数据,而Storm处理的是动态的、连续的数据。Twitter的用户每天都会发上千万的推,所以这种处理技术是非常有用的。Storm不仅仅是一个传统的大数据分析系统:它是一个复杂事件 (complex event-processing) 处理系统的例子。复杂事件处理系统通常是面向检测和计算的,这两部分都可以通过用户定义的算法在Storm中实现。例如,复杂事件处理可以用来从大量的事件中区分出有意义的事件,然后对这些事件实时处理。

Storm模型

Storm实现了一个数据流(data flow)的模型,在这个模型中数据持续不断地流经一个由很多转换实体构成的网络。一个数据流的抽象叫做流(stream),流是无限的元组(Tuple)序列。元组就像一个可以表示标准数据类型(例如int,float和byte数组)和用户自定义类型(需要额外序列化代码的)的数据结构。每个流由一个唯一的ID来标示的,这个ID可以用来构建拓扑中各个组件的数据源。

如下图所示,其中的水龙头代表了数据流的来源,图中有三个流,用不同的颜色来表示,每个数据流中流动的是元组(Tuple),它承载了具体的数据。元组通过流经不同的转换实体而被处理。

Storm介绍

概念

Storm中涉及的主要概念有:

  1. 拓扑(Topologies)
  2. 元组(Tuple)
  3. 流(Streams)
  4. Spouts(喷嘴)
  5. Bolts
  6. 任务(Tasks)
  7. 组件(Component)
  8. 流分组(Stream groupings)
  9. 可靠性(Reliability)
  10. Workers(工作进程)

可以看到Storm中各个概念的名字起的非常好,也很形象。

拓扑(Topologies)

一个Storm拓扑打包了一个实时处理程序的逻辑。一个Storm拓扑跟一个MapReduce的任务(job)是类似的。主要区别是MapReduce任务最终会结束,而拓扑会一直运行(当然直到你杀死它)。一个拓扑是一个通过流分组(stream grouping)把Spout和Bolt连接到一起的图结构。图的每条边代表一个Bolt订阅了其他Spout或者Bolt的输出流。一个拓扑就是一个复杂的多阶段的流计算。

资源

  • TopologyBuilder : 使用这个类来在Java中创建拓扑
  • 在生产集群中运行拓扑
  • 本地模式 : 通过阅读这篇可以学习到如何在本地模式下进行拓扑的开发和测试

元组(Tuple)

元组是Storm提供的一个轻量级的数据格式,可以用来包装你需要实际处理的数据。一个元组是一个命名的值列表,其中的每个值都可以是任意类型的。元组是动态地进行类型转化的--字段的类型不需要事先声明。在Storm中编程时,就是在操作和转换由元组组成的流。通常,元组包含整数,字节,字符串,浮点数,布尔值和字节数组等类型。要想在元组中使用自定义类型,就需要实现自己的序列化方式。

资源

  • 元组(Tuple)

流(Streams)

流是Storm中的核心抽象。一个流由无限的元组序列组成,这些元组会被分布式并行地创建和处理。通过流中元组包含的字段名称来定义这个流。

每个流声明时都被赋予了一个ID。只有一个流的Spout和Bolt非常常见,所以 OutputFieldsDeclarer 提供了不需要指定ID来声明一个流的函数(Spout和Bolt都需要声明输出的流)。这种情况下,流的ID是默认的“default”。

资源

  • OutputFieldsDeclarer : 用来声明流和流的定义
  • Serialization : Storm元组的动态类型转化,声明自定义的序列化方式
  • ISerialization : 自定义的序列化必须实现这个接口
  • CONFIG.TOPOLOGY_SERIALIZATIONS : 可以通过这个配置来注册自定义的序列化接口

Spouts

Spout(喷嘴,这个名字很形象)是Storm中流的来源。通常Spout从外部数据源,如消息队列中读取元组数据并吐到拓扑里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能够在一个元组被Storm处理失败时重新进行处理,而非可靠的Spout只是吐数据到拓扑里,不关心处理成功还是失败了。

Spout可以一次给多个流吐数据。此时需要通过 OutputFieldsDeclarerdeclareStream 函数来声明多个流并在调用 SpoutOutputCollector 提供的 emit 方法时指定元组吐给哪个流。

Spout中最主要的函数是 nextTuple 。如果没有新的元组过来,就直接返回,否则把新元组吐到拓扑里。 nextTuple 必须是非阻塞的,因为Storm在同一个线程里执行Spout的函数。

Spout中另外两个主要的函数是 ackfail 。当Storm检测到一个从Spout吐出的元组在拓扑中成功处理完时调用 ack ,没有成功处理完时调用 fail 。只有可靠型的Spout会调用 ackfail 函数。更多细节可以查看 Storm Java文档 。

Bolts

在拓扑中所有的计算逻辑都是在Bolt中实现的。一个Bolt可以处理任意数量的输入流,产生任意数量新的输出流。Bolt可以做函数处理,过滤,流的合并,聚合,存储到数据库等操作。Bolt就是流水线上的一个处理单元,把数据的计算处理过程合理的拆分到多个Bolt、合理设置Bolt的task数量,能够提高Bolt的处理能力,提升流水线的并发度。

Bolt可以给多个流吐出元组数据。此时需要使用 OutputFieldsDeclarerdeclareStream 方法来声明多个流并在使用 [OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)emit 方法时指定给哪个流吐数据。

当你声明了一个Bolt的输入流,也就订阅了另外一个组件的某个特定的输出流。如果希望订阅另一个组件的所有流,需要单独挨个订阅。InputDeclarer有语法糖来订阅ID为默认值的流。例如 declarer.shuffleGrouping("redBolt") 订阅了redBolt组件上的默认流,跟 declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID) 是相同的。

在Bolt中最主要的函数是 execute 函数,它使用一个新的元组当作输入。Bolt使用 OutputCollector 对象来吐出新的元组。Bolts必须为处理的每个元组调用 OutputCollectorack 方法以便于Storm知道元组什么时候被各个Bolt处理完了(最终就可以确认Spout吐出的某个元组处理完了)。通常处理一个输入的元组时,会基于这个元组吐出零个或者多个元组,然后确认(ack)输入的元组处理完了,Storm提供了 IBasicBolt 接口来自动完成确认。

必须注意 OutputCollector 不是线程安全的,所以所有的吐数据(emit)、确认(ack)、通知失败(fail)必须发生在同一个线程里。更多信息可以参照 问题定位 。

资源

  • IRichBolt : 这是Bolt的通用接口
  • IBasicBolt : 很方便的Bolt接口,用于定义做过滤或者简单处理的Bolt
  • OutputCollector : Bolt通过这个类的实例来吐元组给输出流
  • 保证消息处理 :

任务(Tasks)

每个Spout和Bolt会以多个任务(Task)的形式在集群上运行。每个任务对应一个执行线程,流分组定义了如何从一组任务(同一个Bolt)发送元组到另外一组任务(另外一个Bolt)上。可以在调用 TopologyBuildersetSpoutsetBolt 函数时设置每个Spout和Bolt的并发数。

流分组(Stream Grouping

定义拓扑的时候,一部分工作是指定每个Bolt应该消费哪些流。流分组定义了一个流在一个消费它的Bolt内的多个任务(task)之间如何分组。流分组跟计算机网络中的路由功能是类似的,决定了每个元组在拓扑中的处理路线。

在Storm中有七个内置的流分组策略,你也可以通过实现 CustomStreamGrouping 接口来自定义一个流分组策略:

  1. 洗牌分组(Shuffle grouping): 随机分配元组到Bolt的某个任务上,这样保证同一个Bolt的每个任务都能够得到相同数量的元组。
  2. 字段分组(Fields grouping): 按照指定的分组字段来进行流的分组。例如,流是用字段“user-id"来分组的,那有着相同“user-id"的元组就会分到同一个任务里,但是有不同“user-id"的元组就会分到不同的任务里。
  3. Partial Key grouping: 跟字段分组一样,流也是用指定的分组字段进行分组的,但是在多个下游Bolt之间是有负载均衡的,这样当输入数据有倾斜时可以更好的利用资源。 这篇论文 很好的解释了这是如何工作的,有哪些优势。
  4. All grouping: 流会复制给Bolt的所有任务。小心使用这种分组方式。
  5. Global grouping: 整个流会分配给Bolt的一个任务。具体一点,会分配给有最小ID的任务。
  6. 不分组(None grouping): 说明不关心流是如何分组的。目前,None grouping等价于洗牌分组。
  7. Direct grouping:一种特殊的分组。对于这样分组的流,元组的生产者决定消费者的哪个任务会接收处理这个元组。只能在声明做直连的流(direct streams)上声明Direct groupings分组方式。只能通过使用 emitDirect 系列函数来吐元组给直连流。一个Bolt可以通过提供的 TopologyContext 来获得消费者的任务ID,也可以通过OutputCollector对象的 emit 函数(会返回元组被发送到的任务的ID)来跟踪消费者的任务ID。
  8. Local or shuffle grouping:如果目标Bolt在同一个worker进程里有一个或多个任务,元组就会通过洗牌的方式分配到这些同一个进程内的任务里。否则,就跟普通的洗牌分组一样。

资源

  • TopologyBuilder : 使用这个类来定义拓扑
  • InputDeclarer : 当调用 TopologyBuildersetBolt 函数时会返回这个对象,它用来声明一个Bolt的输入流并指定流的分组方式。
  • CoordinatedBolt : 这个Bolt对于分布式的RPC拓扑很有用,大量使用了直连流(direct streams)和直连分组(direct groupings)

可靠性(Reliability)

Storm保证了拓扑中Spout产生的每个元组都会被处理。Storm是通过跟踪每个Spout所产生的所有元组构成的树形结构并得知这棵树何时被完整地处理来达到可靠性。每个拓扑对这些树形结构都有一个关联的“消息超时”。如果在这个超时时间里Storm检测到Spout产生的一个元组没有被成功处理完,那Sput的这个元组就处理失败了,后续会重新处理一遍。

为了发挥Storm的可靠性,需要你在创建一个元组树中的一条边时告诉Storm,也需要在处理完每个元组之后告诉Storm。这些都是通过Bolt吐元组数据用的 OutputCollector 对象来完成的。标记是在 emit 函数里完成,完成一个元组后需要使用 ack 函数来告诉Storm。

这些都在“ 保证消息处理 ”一文中会有更详细的介绍。

Workers(工作进程)

拓扑以一个或多个Worker进程的方式运行。每个Worker进程是一个物理的Java虚拟机,执行拓扑的一部分任务。例如,如果拓扑的并发设置成了300,分配了50个Worker,那么每个Worker执行6个任务(作为Worker内部的线程)。Storm会尽量把所有的任务均分到所有的Worker上。

资源

  • Config.TOPOLOGY_WORKERS: 这个配置设置了执行拓扑时分配Worker的数量。

Storm中用到的技术

ZeroMQ 提供了可扩展环境下的传输层高效消息通信,一开始Storm的内部通信使用的是ZeroMQ,后来作者想把Storm移交给Apache开源基金会来管理,而ZeroMQ的许可证书跟Apache基金会的政策有冲突,所以就改用了 Netty 。

Clojure Storm系统的实现语言。Clojure是由Rich Hicky作为一种通用语言发明的,它衍生自Lisp语言,简化了多线程编程。

Apache ZooKeeper Zookeeper是一个实现高可靠的分布式协作的开源项目。Storm使用Zookeeper来协调集群中的多个节点。

Preview of Storm

参考资料:

History of Apache Storm and lessons learned 推荐大家读一读,是Storm作者Nathan Marz写的,文章讲述了Storm的构思、创建过程和Storm的市场营销,沟通交流和社区开发的故事。

twitter storm --IBM developerworks

Process real-time big data with Twitter Storm

Spark, an alternative for fast data analytics

正文到此结束
Loading...