转载

去哪儿网实时日志流处理系统的Alluxio异地存储实践

概述

互联网公司同质应用服务竞争日益激烈,业务部门亟需利用线上实时反馈数据辅助决策支持以提高服务水平。 Alluxio (前Tachyon)作为一个以内存为中心的虚拟分布式存储系统,在大数据系统性能提升以及生态系统多组件整合的进程中扮演着重要角色。本文将介绍去哪儿网(Qunar)的一个基于Alluxio的实时日志流的处理系统,在本案例中,Alluxio重点解决了异地数据存储和访问慢的问题,从而将生产环境中整个流处理流水线的性能总体提高了近10倍,而峰值时甚至达到300倍左右。

目前,去哪儿网的流处理流水线每天需要处理的业务日志量大约60亿条,总计约4.5TB的数据量。其中许多任务都需要保证在稳定的低延时情况下工作,快速迭代计算出结果并反馈到线上业务系统中。例如,无线应用的用户点击、搜索等行为产生的日志,会被实时抓取并写入到流水线中分析出对应的推荐信息,然后反馈给业务系统并展示在应用中。如何保证数据的可靠性以及低延时,就成了整个系统开发和运维工作中的重中之重。

Alluxio大数据存储系统源自于UC Berkeley AMPLab,目前由Alluxio公司在开源社区主导开发。它是世界上第一个以内存为中心的虚拟的分布式存储系统,并将多样化的上层计算框架和底层存储系统连接起来,统一数据访问方式。Alluxio以内存为中心的存储特性使得上层应用的数据访问速度比现有常规方案快几个数量级。此外,Alluxio提供的层次化存储、统一命名空间、世系关系、灵活的文件API、网页UI以及命令行工具等特性也方便了用户在不同实际应用场景下的使用。在本文中,我们将结合具体案例做进一步地阐述。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

在我们的案例中,整个流处理计算系统部署在一个物理集群上,Mesos负责资源的管理和分配,Spark Streaming和Flink是主要的流计算引擎;存储系统HDFS位于另外一个远端机房,用于备份存储整个公司的日志信息;Alluxio则是作为核心存储层,与计算系统部署在一起。业务流水线每天会产生4.5TB左右的数据写入存储层,同时通过Kafka消费大约60亿条日志与存储层中的数据进行碰撞分析。Alluxio对整个流处理系统带来的价值主要包括:

  1. 利用Alluxio的分层存储特性,综合使用了内存、SSD和磁盘多种存储资源。通过Alluxio提供的LRU、LFU等缓存策略可以保证热数据一直保留在内存中,冷数据则被持久化到level 2甚至level 3的存储设备上;而HDFS作为长期的文件备份系统。
  2. 利用Alluxio支持多个计算框架的特性,通过Alluxio实现Spark以及Zeppelin等计算框架之间的数据共享,并且达到内存级的文件传输速率;此外,我们计划将Flink和Presto业务迁移到Alluxio上。
  3. 利用Alluxio的统一命名空间特性,便捷地管理远程的HDFS底层存储系统,并向上层提供统一的命名空间,计算框架和应用能够通过Alluxio统一访问不同的数据源的数据;
  4. 利用Alluxio提供的多种易于使用的API,降低了用户的学习成本,方便将原先的整个系统迁移至Alluxio,同时也使得调整的验证过程变得轻松许多;
  5. 利用Alluxio解决了原有系统中“Spark任务无法完成”的问题:原系统中当某个Spark executor失败退出后,会被Mesos重新调度到集群的任何一个节点上,即使设置了保留上下文,也会因为executor的“漂泊”而导致任务无法完成。新系统中Alluxio将数据的计算与存储隔离开来,计算数据不会因executor的“漂泊”而丢失,从而解决了这一问题。

本文剩余部分将详细对比分析Qunar原有流处理系统以及引入Alluxio改进后的流处理系统,最后简述我们下一步的规划和对Alluxio未来方向的期待。

原有系统架构以及相关问题分析

我们的实时流处理系统选择了Mesos作为基础架构层(Infrastructure Layer)。在原先的系统中,其余组件都运行在Mesos之上,包括Spark、Flink、Logstash以及Kibana等。其中主要用于流式计算的组件为Spark Streaming,在运行时Spark Streaming向Mesos申请资源,成为一个Mesos Framework,并通过Mesos调度任务。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

如上图所示,在该流处理系统中,待处理的日志数据来自于多个数据源,由Kafka进行汇总,数据流在经过了Logstash集群清洗后再次写入Kafka暂存,后续由Spark Streaming和Flink等流式计算框架消费这些数据,计算的结果写入HDFS。在原先的数据处理过程中,主要存在着以下性能瓶颈:

  1. 用于存放输入和输出数据的HDFS位于一个远程存储集群中(物理位置上位于另一个机房)。本地计算集群与远程存储集群存在较高的网络延迟,频繁的远程数据交换成为整个流处理过程的一大瓶颈;
  2. HDFS的设计是基于磁盘的,其I/O性能,尤其是写数据性能难以满足流式计算所要求的延时;Spark Streaming在进行计算时,每个Spark executor都要从HDFS中读取数据,重复的跨机房读文件操作进一步地的拖慢了流式计算的整体效率;
  3. 由于Spark Streaming被部署在Mesos之上,当某个executor失效时,Mesos可能会在另一个节点重启这个executor,但是之前失效节点的checkpoint信息不能再被重复利用,计算任务无法顺利完成。而即便executor被重启在同一节点上,任务可以完成时,完成的速度也无法满足流式计算的要求。
  4. 在Spark Streaming中,若使用MEMORY_ONLY方式管理数据块,则会有大量甚至重复的数据位于Spark executor的JVM中,不仅增大了GC开销,还可能导致内存溢出;而如果采用MEMORY_TO_DISK或者DISK_ONLY的方式,则整体的流处理速度会受限于缓慢的磁盘I/O。

改进后的系统架构及解决方案

在引入Alluxio之后,我们很好地解决上述问题。在新的系统架构中,整个流式处理的逻辑基本不变。唯一变化的地方在于使用Alluxio代替原先的HDFS作为核心存储系统,而将原来的HDFS作为Alluxio的底层存储系统,用于备份。Alluxio同样运行在Mesos之上,各个计算框架和应用都通过Alluxio进行数据交换,由Alluxio提供高速的数据访问服务并维护数据的可靠性,仅将最终输出结果备份至远程HDFS存储集群中。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

在新的系统架构中,最初的输入数据仍然经过Kafka过滤,交由Spark Streaming消费,不同的是,Spark Streaming在计算时产生的大量中间结果以及最终的输出都存放在Alluxio中,避免与较慢的远程HDFS集群进行交互,同时,存放在Alluxio中的数据也能够很方便地与上层组件,如Flink、Zeppelin进行共享。在整个过程中,Alluxio的一些重要特性对整个流水线的性能提升起到了重要的作用:

  1. 支持分层存储 — 我们在每个计算节点上都部署了Alluxio Worker,管理了本地的存储介质,包括内存、SSD和磁盘,构成了层次化的存储层。每个节点上流计算相关的数据会被尽可能的存放在本地,避免消耗网络资源。同时,Alluxio自身提供了LRU、LFU等高效的替换策略,能够保证热数据位于速度较快的内存层中,提高了数据访问速率;即便是冷数据也是存放在本地磁盘中,不会直接输出到远程HDFS存储集群;
  2. 跨计算框架数据共享 — 在新的系统架构中,除了Spark Streaming本身以外,其他组件如Zeppelin等也需要使用Alluxio中存放的数据。另外,Spark Streaming和Spark batch job可以通过Alluxio相连并从中读取或写入数据,来实现内存级别的数据传输;另外,我们还在将Flink相关的业务与逻辑迁移到Alluxio上,来实现计算框架间的高效数据共享。
  3. 统一命名空间 — 通过使用Alluxio分层存储中HDD层,来管理计算集群本地的持久存储,同时使用Alluxio的mount功能来管理远程的HDFS存储集群。Alluxio很自然地将HDFS以及Alluxio自身的存储空间统一管理起来。这些存储资源对于上层应用和计算框架透明的,只呈现了一个统一的命名空间,避免了复杂的输入输出逻辑;
  4. 简洁易用的API — Alluxio提供了多套易用的API,它的原生API是一套类似java.io的文件输入输出接口,使用其开发应用不需要繁杂的用户学习曲线;Alluxio提供了一套HDFS兼容的接口,即原先以HDFS作为目标存储的应用程序能够直接迁移至Alluxio,应用程序仅仅需要将原有的hdfs://替换成alluxio://就能正常工作,迁移的成本几乎是零。此外,Alluxio的命令行工具以及网页UI方便了开发过程中的验证和调试步骤,缩短了整个系统的开发周期。例如我们使用Chronos(一个Mesos的Framework,用来执行定时任务)在每天的凌晨通过Alluxio loadufs命令提前加载前一天由MapReduce计算好的数据到Alluxio中,以便后续的操作可以直接读取这些文件。
  5. Alluxio与Spark有着紧密的结合,我们在Spark Streaming将主要数据存放在Alluxio中而不是Spark executor的JVM中,由于存储位置同样是本地内存,因此不会拖慢数据处理的性能,反而能够降低Java GC的开销。同时,这一做法也避免了因同一节点上数据块的冗余而造成的内存溢出。我们还将SparkSteaming计算的中间结果即对RDD的checkpoint存储在Alluxio上。

通过利用Alluxio众多特性以及将数据从远程HDFS存储集群预取至本地Alluxio等优化方式,整个流处理流水线中的数据交互过程大量转移到本地集群的内存中,从而极大地提升了数据处理的整体吞吐率,降低了响应延时,满足了流处理的需求。从我们的线上实时监控的每次micro batch(间隔10分钟)的监控图中,可以看到平均处理吞吐量从由以前单个mirco batch周期内20至300的eps,提升到较为稳定的7800eps,平均的处理时间从8分钟左右降低到30至40秒以内,整个流处理加速16-300倍。尤其是在网络繁忙拥挤时,上百倍的加速效果尤为明显。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

而对Kafka的消费指标来看,消费速度也从以前的200K条消息稳定提升到将近1200K。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

此外,我们利用Alluxio自带的metrics组件将监控数据发送到graphite,以方便来监控Alluxio的JVM以及Alluxio的FileSystem状态。可以看到Alluxio Master对Heap内存占用率维持在低水平。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

同期的文件数量和操作统计为下图所示。

去哪儿网实时日志流处理系统的Alluxio异地存储实践

未来展望

本文介绍的优化方法主要是针对利用Alluxio来解决异地存储访问慢的问题。性能提升的工作是永无止境的,最后我们也总结了一些未来的工作:

  • 我们线上环境中目前使用的Alluxio的版本是0.8.2,Spark Streaming计算的结果目前只能同步写入底层存储系统(在我们的案例中即为HDFS),我们已经测试了Alluxio 1.0.1 并准备上线新版本,得益于Alluxio社区活跃的开发,新版本的性能在很多方面都有更大的提升。
  • 我们计划将Flink的计算任务也迁移至Alluxio,同时我们也在计划修改Presto,令其可以同样享受Alluxio带来的跨计算引擎高速数据共享的功能;
  • 由于Alluxio能够很容易于现有存储系统进行整合并提升上层业务的性能,因此我们也将推广Alluxio到更多的业务线中,例如用于分析日志数据的批处理任务等。
原文  http://geek.csdn.net/news/detail/77491
正文到此结束
Loading...