用了一段时间Storm后的笔记。发现可以记的东西不多,证明Storm挺简单的,你只要遵循一些简单的接口与原则,就能写出大规模实时消息处理的程序。
没接触前把Storm想象得很强大,接触后觉得它就那样可有可无,再后来又觉得没有了全部自己做也麻烦。
1. 集群管理:支持应用的部署,工作节点的管理(任务分配、HA、Scalable等),Metrics的收集。
2. 数据流的传输与路由:支持多种数据在各处理节点间自由流动,基于Netty的高效传输机制,支持轮询,多播,按属性分组的路由。
3. 数据高可靠性的保证:还支持实现数据流动了多个节点后,在某个节点的失败,可以引发数据从源头开始重传的 高级功能 。
按Storm的官方说法,你也可以自己搭建许多消息队列和worker组成的网络来实现实时处理,但是:
乏味:你大部份开发时间花费在配置消息发送到哪里,部署worker,还有部署中间队列。你所关心的实时处理逻辑对应到你的代码的只占了很少的比例 。
脆弱:你要自己负责保持每个worker和队列正常工作。
伸缩时痛苦:当单个worker或队列的消息吞吐量太高时,你需要分区,即数据如何分散。你需要重新配置其它worker,让它们发送消息到新位置。这导致删除或添加部件都可能失败。
核心代码是用Clojure写成,翻看代码非常不便。其实,它现在很多新的外部模块都用Java来写了,另外阿里同学翻写了一个 JStorm 。
Storm对可靠消息传输的支持程度,很大程度上依赖于Spout的实现。
并不默认就是支持高可靠性的,collector emit的时候要传输msgId,要自己处理ack(msgId)和fail(msgId)函数。而很多spout其实没有这样做,只有 Kafka Spout 做的比较正规。
默认的,如果三十秒,消息流经的所有下游节点没有都ack完毕,或者有一个节点报fail,则触发fail(msgId)函数。
因为ack/fail的参数只有msgId,这就要Spout想在ack/fail时对消息源如Kafka/JMS进行ack/fail,或fail时想重发消息,如果需要完整的消息体而不只是msgId才能完成时,要自己把msgId对应的消息存起来(会撑爆内存么)。
另外,因为每个Spout 是单线程的,会循环的调用nextTuple()的同时,调用ack()或fail()处理结果。所以nextTuple()如果没消息时不要长期阻塞,但也不要完全不阻塞,参考storm-starter里的spout,等个50ms好了。在JStorm里,就改为了两条分开的线程。
另外,spout有时是每次被调用nextTuple()时主动去pull消息的,有时是被动接收消息后存放在LinkedBlockingQueue里,netxtTuple()时从Queue里取消息的。如果消息源没有ack机制,Spout突然crash的话,存在queue里的消息也会丢失。
直接用BasicBolt,会在execute()后自动ack/fail Tuple,而RichBolt则需要自行调用ack/fail。
那什么时候使用RichBolt? Bolt不是在每次execute()时立刻产生新消息,需要异步的发送新消息时,又或者想异步的ack/fail原消息时就需要。
BasicBolt的prepare()里并没有collector参数,只在每次execute()时传入collector。而RichBolt刚好相反,你可以在初始化时就把collector保存起来,用它在任意时候发送消息。
另外,如果用RichBolt的collector,还要考虑在发送消息时是否带上传入的Tuple,如果不带,则下游的处理节点出错也不会回溯到Spout重发。用BasicBolt则已默认带上。
如果希望上游的Spout重发消息,则在BasicBolt中抛出FailedException 或在RichBolt中直接fail掉Tuple。其他情况下,execute()方法不应该抛出任何异常,或者你故意抛出异常使得Topology停转。
不像Linkedin的 Samza ,Storm完全不管数据的持久化,Bolt如果需要历史数据,一般会使用路由规则,比如相同用户的数据路由到同一个Bolt,然后Bolt自己在内存里管理数据。
当然,也可以用共享的NoSQL存储如Redis,但此时压力就都在Redis上了。
如下设置,所有Bolt都会在定时收到一条消息,一般用于触发sliding windows的统计等。
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);
如下函数用于判断是否Tick消息
protected static boolean isTickTuple(Tuple tuple) {<br /> return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)<br /> && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);<br /> }
除了使用Java代码,还可以使用Yaml来动态定义拓扑,见 https://github.com/ptgoetz/flux
并发度的定义及命令行动态扩容见 官方文档 ,另对于worker进程数的建议是Use one worker per topology per machine。
Tuple除了传基本类型与数组,AraayList,HashMap外,也可以传一下Java对象的。Storm使用 Kyro 作为序列化框架,据测比Hessian什么的都要快和小。但一定注册这些Java对象的类型,否则就会使用Java默认的序列化。
参看 官方文档 ,有两种方式注册类型,一个是storm.yaml文件,一个是Config类的registerSerialization方法。如无特殊需求,直接注册需要序列化的类就可以了,不需要自己实现一个Serializer。
Spout和Bolt的构造函数只会在submit Topology时调一次,然后序列化起来,直接发给工作节点,工作节点里实例化时不会被调用里,所以复杂的成员变量记得都定义成transient,在open(),prepare()里初始化及连接数据库等资源。
另外,需要实现close()函数清理资源,但该函数不承诺在worker进程被杀时保证被调用。
按名称提取fileds的值,取hash,再按当前的可选Tasks取模。所以,动态扩展Task数量,或某Task失效被重建的话,都可能让原来的分配完全乱掉。
比如External目录里的一堆, storm-controlib 里也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至还有 Esper ,目标都是通过配置(比如SQL及Input/Output fields),而非代码,或尽量少的代码,实现交互。有时也可以不一定要直接用它们,当成Example Code来看就好了。
另外,与传统的Java应用思路相比,Bolt/Spout与资源连接时,比较难实现共享连接池的概念,连接池一般都是每个Bolt/Spout实例自用的,要正确处理其连接数量。
如果Worker进程失效,Supervisor进程会检查 Worker的心跳信息,重新进行创建。
如果整个机器节点失效,Nimbus会在其他节点上重新创建。
Supervisor进程和Nimbus进程,需要用Daemon程序如monit来启动,失效时自动重新启动。因为它们在进程内都不保存状态,状态都保存在本地文件和ZooKeeper,因此进程可以随便杀。
如果Nimbus进程所在的机器都直接倒了,需要在其他机器上重新启动,Storm目前没有自建支持,需要自己写脚本实现。
即使Nimbus进程不在了,也只是不能部署新任务,有节点失效时不能重新分配而已,不影响已有的线程。
同样,如果Supervisor进程失效,不影响已存在的Worker进程。
Zookeeper本身已经是按至少三台部署的HA架构了。
Storm UI也是用Clojure写的,比较难改,好在它提供了 Restful API ,可以与其他系统集成,或基于它重写一个UI。
Metrics的采样率是1/20(topology.stats.sample.rate=0.05),即Storm随机从20个事件里取出一个事件来进行统计,命中的话,counter 直接+20。
在旧版本的Storm使用旧版的ZooKeeper要启动数据清理的脚本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默认是24小时清理一次 autopurge.purgeInterval=24
日志的配置在logback/cluster.xml文件里,Storm的日志,天然的需要Logstash + ElasticSearch的集中式日志方案
storm.local.dir 要自己建,而且不支持~/ 代表用户根目录。
storm.yaml的默认值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml
1. buffer大小
2. 屏蔽ack机制