本文原作者为Optimizely的分布式系统工程师David Yu。
我们在Optimizely公司的使命就是帮助决策者们把数据转变为行动。这需要我们能够快速并可靠地移动数据。我们每天要处理几十亿个用户事件,包括浏览网页、点击和定制事件等。能以最快的速度将与用户有关的关键业务信息提供给我们的客户,这一直都是我们最高优先级的任务。正因如此,我们一直都在寻求创新的方式来改进我们的数据处理流水线。
在本文中会介绍为了给客户提供最实时的指标数据,我们是怎样把我们的数据处理流水线由批处理的方式转变为流处理的。
统一。在过去,我们为不同的用途而使用了两种数据库:用HBase存储计算实验指标,而用Druid来计算个性化的结果。对这两套系统的需求是大相径庭的:
实验 |
个性化 |
快速导入数据 |
导入数据可延迟 |
秒级查询延迟 |
亚秒级查询延迟 |
访问者级别指标 |
会话级别指标 |
可是,随着业务需求的不断演进,扩展开始变得非常困难。为了满足业务需求而维护一套Druid+HBase的Lambda架构已经成了技术团队不能承受的负担。我们需要一套新的解决方案,来减少后台的复杂度,并提高开发效率。更重要的是,一套统一的计算基础设施可以成为通用的平台,满足我们未来的许多产品需求。
一致性:如上文所述,这两种计算基础设施提供了不同的指标和计算能力。比如说,实验结果可以让你知道有多少用户访问了你的登陆页面,而个性化结果却告诉你是有多少个会话。我们希望能为客户提供一致的指标,并为所有的产品都提供这两种类型的统计数据。
实时结果:基于会话的结果是通过MR任务算出来的,有可能在收到事件之后延迟几个小时才能得到。实时的解决方案则会为客户提供他们的数据的最新视图。
在之前的文章中,我们介绍过 后台的数据输入流水线 ,和我们是怎样使用 Druid和MR 来基于用户会话存储事务统计数据的。我们从Druid得到的最大好处就是查询时低延迟。可是它本身也有许多固有缺点。比如,因为分段文件是不能更改的,所以不可能增量地更新索引。结果必要时我们只好每隔一段时间就不得不把用户事件重新处理一遍,这样才能解决乱序事件之类的数据问题。另外,我们也不能过于增加分片的数量,因为耗时过长的查询会变得代价巨大。
另一方面,我们也用HBase做用户访问计算。我们把每个事件写入一个HBase Cell,这样就为支持我们运行的查询带来了最大的灵活性。比如,当一位客户想要知道“有多少个用户曾经做过添加购物车的动作”时,我们只需要把范围内的数据全遍历一遍就好了。因为事件都是经过Kafka准实时地推送到HBase里的,数据基本上可以反映最新的情况。可是,我们现在的表的模式没办法保留与每个事件相关的所有元数据。这些元数据又包含着许多通用的信息,比如浏览器类型和地理位置,还有用于定制数据分区的特定用户标签等等。这些数据的冗余让我们没办法支持非常多的用户分区,分区越多,我们存储的代价就越大,查询遍历时间也越长。
因为优化Druid索引越来越难,所以我们决定抛弃Druid这个方案,专心改进我们的HBase数据表现。提前累积事件,并将冗余信息压缩掉就成了最明显的改进方案。因此我们决定求助于Samza。
由于Samza可以无缝地与我们的分布式消息队列Kafka整合在一起,所以对于我们的需求是非常合适的。在第二部分我们会详细讲述这个实时累积的过程。但是从比较高层的角度,Samza可以不断地把事件按照会话来打包,并且周期性地将未写出的数据片段以流的方式写入HBase。有了这个方法,每个HBase Cell就都成了一组事件的统一视图。
这带来了许多好处。首先要提到的就是我们计算各种不同统计数据的核心逻辑在最大程度上保持了不变。既然我们做的量最大的基础运算就是求和(当然这是极度简化了的),把一堆数字加起来,这和把一串累积值加起来是等价的。
我们得到的第二个好处是,会话级的信息是可以马上得到的,所以我们就可以从HBase中查询得到会话指标,实时地回答诸如“每个用户会话产生的平均收益是多少”这样的问题!毫不意外,新创建的HBase模式名就叫SessionDB,这成了我们后台程序标准化的基础。
最后但同样重要的,HBase的存储需求急剧下降,所以查询也可以运行得更快了。通过累积会话级别的元数据,我们不必再在各个Cell之间同步浏览器类型、位置和用户规模之类的信息。下图显示了平均查询延迟(X轴)相对于不同的用户规模(Y轴)之间的对应关系。比如每个会话上平均产生10个事件,中等的查询延迟降低到了5毫秒,以前的值是40+毫秒(黄色线)。
会话聚合(众所周知也叫会话化)并不是我们在Optimizely做的第一个流处理用例。我们曾经将流处理应用到各种ETL任务上,比如:
现在已经有了许多可以用于生产的流处理框架,比如Samza、Spark、Storm和Flink等。我们选择Samza的原因有好几点。首先,Samza允许你把若干个Kafka Topic串连起来,形成一个处理拓扑,这就有了很高的隔离性。因为我们鼓励工程师们提出不同的想法,这种可插入性给我们的数据处理流水线带来的连锁反应最小。其次,Samza可以很容易地与Kafka和YARN整合起来,而我们的系统中后两者用得非常多。另外,Samza的延迟非常低。它提供了一个简单的编程模型,以及非常易用的状态管理框架,所有这些都非常适合我们的需求。
从一个很高的层次来说,一个Samza任务可以从一个或多个Kafka Topic中消费流式的数据。然后它对这些事件做某种计算处理,一次一种,然后再把输出结果写到一个或多个下游Kafka Topic中。
关于Samza任务的一个值得注意的特征是,它们具有很高的可组合性。为了帮助你将Samza任务可以做的事情可视化,你可以把它们当成一批实时的MR任务,由Kafka Topic串连在一起。上图中显示了一个任务可以有多个输入流,以及任务之间如何连接起来,形成一个复杂的工作流。
Samza是可扩展的、容错的和有状态的。我们会简单地涉及这些方面内容,因为我们的会话化任务会在某种程度上利用这些功能。
扩展性:就像MR任务一样,Samza任务也是高度并发和可扩展的。不同点在于,与基于文件不同,Samza的并行是基于分区的。一个任务被分成多个Task。拿MR来做类比,一个Task大概可以相当于一对Mapper+Reducer。Task执行“Map”时,就是把事件输出到相应的指定Kafka Topic分区中。同时,从输入流分区中出来的事件全都会汇总给一个Task进行处理,这个行为和Reducer很像。
有个很重要的方面要牢记在心,一个输入Topic的分区总是静态地被分配给某个Task实例。那么,Task实例的数量也就直接与任意指定输入Topic的最大分区数量有关(如下图中的例子所示)。当你第一次提交一个任务时,这种映射关系就建立起来了,并且出于容错的考虑被持久化到某个Kafka Topic中。这种耦合会有助于简化Samza的状态管理模型(下文中会讲到),因为每个Task只要管理好自己的状态就可以了。
如果Task的数量是固定的,那我们还怎么横向扩展?一个Samza任务可以通过增加运行着的容器的数量来横向扩展。当Samza运行在YARN集群(也是目前为止唯一一个支持运行Samza的分布式环境)之上时,每个Task实例都是在一个容器中运行的,容器也是计算资源的唯一单位。依据一个Task需要的计算能力的多少,你可以把所有东西都运行在一个容器中,也可以按Task的数量去创建容器,来为每个Task提供最强的处理能力。上图就表示出了这种弹性。
容错:Samza承诺,假如一个容器崩溃了,不管出于什么原因,在容器重新启动后,那个容器里面的所有Task都可以从之前崩溃的点恢复。为了支持这个功能,Samza会定期地记录检查点,把各个Task消费完的偏移量记下来。它也会为它内部的状态数据库上发生的变更记录检查点。在把所有这样的信息都记入Kafka之后,任务恢复功能就可以通过消费和重放变更操作的流来重建各种各样的内部任务状态。
无状态:大多数流处理系统都要记录某些状态,你在生成用户指标时可能会是某个指定用户的事件数量,而对于我们的案例,则是某个指定的会话里面触发的全部事件。为了实现真正意义上的实时流处理,切合实际的作法必然是将状态数据保存在Task的本地,而不是别的地方。Samza自带地提供了RocksDB来做为一种有效的KV存储(就像一个嵌入式HBase),来保存那些数据量过大而无法保存在本地内存中的状态数据。如上面所述,因为在变更日志的Topic中已经将相应的改动持久化了,在Task出故障时这个数据库也是可以恢复的。
在了解了Samza的各种基本概念之后,现在就可以继续介绍我们是如何利用这个工具来不断地将各种事件归入会话的了。
简单来说,一个会话就是相对比较快速地连续触发的一系列用户事件。为了可以方便地将会话划分成不同的类型,我们要求上游的客户端为每个事件都指定一个会话ID。举个有代表性的例子,如果某些事件它们发生的时间间隔少于30分钟,那就为它们指定相同的会话ID。
这样的话,Samza用于聚合事件的事件处理逻辑就变得非常清晰了。每个任务维护一个基于Task的KV数据库,使用会话ID做为主键。在收到一个事件时,任务就会在KV数据库中进行查找。如果会话ID不存在,它就创建一个新的会话,否则就更新相应的会话。更新操作常常会带来数据合并。一般的元数据(比如IP地址、位置信息、浏览器版本等)都是在顶层聚合的,而事件的具体信息(比如事件类型、添加到购物车的价格等)就保存成了一个列表。
如之前的博客中所述,每个HBase Cell现在都存储了一个会话。为了可以不断地用会话的最新版本覆盖掉Cell的内容,我们要不断地将KV数据库里的镜像写入某个Kafka Topic。
我们之前提过的一个关于我们设计的很重要的问题是,为了让HBase中的数据保持最新,我们该以怎样的频率来将当前会话的信息保存到HBase中?我们可以在一个会话处理完了一个事件之后立刻就做这个操作。这么做的好处是它将会话更新延迟最小化了,也产生了最实时的结果。不过弊端是,这样做也会产生大量的会话更新操作,也就极大地增大了我们Kafka集群的负载。所以,我们利用了Samza的窗口功能,每分钟将会话数据成批地写入HBase。如果某个用户在60秒内触发了10次点击操作,那么在这一分钟结束时,我们就会生成一份会话数据,其中包含10个事件,而不是生成10份会话数据。这极大地减少了通过网络发送的数据量。
和会话化的逻辑听起来一样明显,我们在进行会话化开发的过程中也遇到了各种各样的挑战。
遍历RocksDB的操作会影响任务的吞吐率:如上文所述,Samza窗口在导出所有会话的快照时,会将所有的会话挂起。因为每个Samza进程都是单线程的(在将来Samza会引入多线程处理的版本),所以事件处理实际上会被阻塞起来,直到窗口结束。这意味着在极端情况下,如果窗口持续的时间超过了配置的窗口之间的间隔,那事件处理过程就压根没机会执行了。为避免这样的事情发生,你应该让窗口内运行的操作尽可能的高效,所以也该避免频繁地对RocksDB进行全量扫描。就我们而言,我们的方法是在内存中保存了一个简化版的KV数据库内容,就是为了避免这个问题。遍历一张简单哈希表的操作,毕竟还是会比对KV数据库进行磁盘扫描快得多。
要小心Kafka的日志压缩功能:日志压缩功能让Kafka根据特定的策略删除一些日志记录,而不是简单地在每个分区的末尾将内容整块删除。启用了日志压缩功能的Topic将只包含每个主键的最后一条记录。如果相应的值是NULL,那么Kafka就会在配置的时间超时之后,将那个主键删除。首先,如果你使用的Kafka版本比较旧(就是0.8.2),请保证配置项log.cleaner.enable被设置为true了。其次,某些Samza Topic,比如检查点或者变更日志之类的,就不会被压缩。在某次重新部署的过程中我们发现了这一点,当时那个任务经过了几小时才恢复处理。最后它要消费几TB的未压缩的变更日志,才能完成每个KV数据库的物化。
关于日志压缩功能我们得到的另一个教训是,如果没有删除操作的话,你就不该把这些内容写入一个有日志压缩功能的Topic。关于这一点的收获是,配置项log.retention.bytes压根不适用于有日志压缩功能的Topic。当我们的输出Topic不断地增长并且最终耗尽了我们的硬盘时,我们才最终明白了这一点。
为RocksDB配置SSD硬盘:在开发过程中我们注意到,有的时候有一些Task的处理速度会跟不上它们接收消息的速度。进一步的调查表明,这些Task的处理时间和窗口时间都相对比较长。最终我们发现根本原因在于它们是慢在与底层的RocksDB数据库的交互过程,是配给YARN实例的磁盘的缘故。这些Task的容器是部署在节点管理服务器上的,配置的是EBS磁盘。因为RocksDB针对在SSD或内存这样的快速存储上进行的快速压缩操作做了高度优化,所以在为它配置EBS硬盘时,就极大地削弱了RocksDB的IO性能。
感谢陈兴璐对本文的审校。
给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ,@丁晓昀),微信(微信号: InfoQChina )关注我们。