编者按:InfoQ开设新栏目“品味书香”,精选技术书籍的精彩章节,以及分享看完书留下的思考和收获,欢迎大家关注。本文节选自微软研究院研究员Dharmesh Kakadia著、DockerOne社区刘梦馨和崔婧雯翻译的《Mesos:大数据资源调度与大规模容器运行最佳实践》中的章节“Mesos上的复杂数据分析”。
本文将介绍多个在 Mesos 上进行复杂数据分析的框架。我们会介绍如何在 Mesos 上搭建 Storm 和 Spark Streaming 来处理实时数据流,以及如何在 Mesos 上运行 NoSQL 数据库 Canssandra。
本章包括如下内容:
大数据的爆炸式增长不仅体现在生产的数据量上,也体现在要求处理海量数据得到有意义结果的速度和多样性上。因此,数据和计算的速度推动开发人员开发实时流处理框架,同时,数据天然的多样性和松散性也促进了NoSQL的演进。
随着 物联网 ( Internet of Things , IoT )的兴起,传感器、社交媒体、机器事务、监控等都在大规模地高速产生数据。这些数据能够提供的信息非常有价值,但是如果数据的分析结果有延迟,或者仅能分析过期数据,那么这些数据就会丧失价值。前一章我们介绍了使用 Hadoop 和 Spark 可以处理的数据量。这些传统工具很适合被用来完成批处理或离线分析,但是它们不是为实时流分析或低延时应用程序设计的,比如,类SQL查询处理。
当流处理在现代数据架构中日益重要时,现代数据架构出现了其他一些组件。现代数据架构包括服务不同需求的不同组件。Lambda 架构(http://en.wikipedia.org/wiki/Lambda_architecture)是十分流行的设计数据架构的方式。它主要包括三层:
在 Mesos 上运行 Lambda 架构不仅可以共享资源,而且可以帮助容错。批处理层的理念是通过随时处理收集到的数据来导出处理结果的,比如,构建预测模型。前一章已经演示了在 Mesos 上如何使用 Hadoop 和 Spark 进行数据处理。Apache Hama(https://hama.apache.org)是批处理层的另一种框架。它是通用的块同步处理( Bulk Synchronous Processing , BSP )框架,在图像处理和矩阵计算领域非常有用。
随着多样化新数据的产生速度不断加快,仅依赖离线模型,周期性地离线处理数据已经不能满足需求。速度层的理念是在数据生成时就完成处理。该层主要进行流处理,也被称为 复杂事件处理 ( Complex Event Processing , CEP )或实时处理。Mesos 支持 Apache Samza、Apache Storm 和 Spark streaming 框架来实现速度层。Apache Samza(http://samza.apache.org)是构建在 Apache Kafka 框架之上的流处理框架。有项目正在将 Samza 集成到 Mesos 里(https://github.com/Banno/samza-mesos)。下一节会讨论 Apache Storm 和 Spark Streaming。注意不同的流处理框架使用不同的架构,在速度、支持的操作、一致性语义和可扩展性之间采用不同的权衡措施。
服务层负责存储批处理和速度层的输出,并且基于这些输出提供查询服务。Mesos 提供了越来越多的选择,为存储和服务数据构建可扩展层。
除了这三个层次之外,还需要三层之间的连接器来接收数据,发送到其他层。Apache Kafka(http://kafka.apache.org)是分布式发布-订阅(也称为pub/sub)消息系统。Pub/sub系统是现代数据架构的后台机制。它们以松散的方式将不同的数据处理框架连接起来,适用于多种应用场景。有项目正在积极地将Kafka集成到Mesos里(https://github.com/stealthly/kafka-mesos)。另外,随着越来越多的框架使用即将发布的 Mesos 里的持久化存储(https://issues.apache.org/jira/browse/MESOS-1554),Mesos 上的复杂数据处理会越来越健壮。这并不意味着,复杂数据分析需要复杂的工具集,Mesos 支持许多其他架构来满足复杂数据处理的多样化需求。本书撰写时,很多这样的项目还处在早期发展阶段,但是都已经取得了实质性进展。
Apache Storm 是实时分布式流事件处理引擎(https://storm.apache.org)。Storm 特点是事务性、可靠、可扩展、可容错,并且提供了易用的 API。和 MapReduce 相比,它的架构完全不同。MapReduce 系统,比如 Hadoop、Spark 等都将代码移动到数据附近。这意味着在 MapReduce 架构里,每个节点都有一些数据,每个节点也拥有完全相同的代码来生成结果。但是在 Storm 里,每个节点完成不同类型的处理,处理不同的数据流。
Storm 的主要抽象概念是流过节点的流(元组流),每个节点完成一些处理。元组非常通用,可以包含一系列任意类型的可序列化的对象。在 Storm 里,处理序列用拓扑来描述。拓扑永远在运行,在流数据到达时完成处理。
Storm 拓扑
上图展示了基本拓扑概念。拓扑包含数据源(spout)和数据操作(bolt)。Spout 是数据源。它们侦听数据源,将元组发送到拓扑里。每一次循环代表一次数据操作,完成一些处理,这些流动的元组和数据操作构成的 DAG(有向无环图)就是拓扑。
Storm 是主-从架构。Numbus 是主节点守护进程,负责协调和监控。工作节点运行 supervisor,执行拓扑的一部分。Nimbus 和 supervisor 通过 ZooKeeper 或者本地磁盘相互通信。
Storm有很多高级特性,比如,支持精细监控、事务语义使用 Trident 等,这些不在本书讨论范围。更多Storm 的内容,参考 Quinton Anderson 的《Storm 实时数据处理》。
Nathan Marz 开发了调度器和执行器的第一个版本,随后项目在社区(https://github.com/mesos/storm)里进一步发展。在 Mesos 上运行任意框架,都需要调度器来代表框架的任务向 Mesos 申请资源,也需要执行器运行这些任务。如下步骤可以完成 Mesos 上 Storm 的安装:
ubuntu@master:~ $ git clone https://github.com/mesos/storm ubuntu@master:~ $ cd storm
ubuntu@master:~/storm $ ./bin/build-release.sh downloadStormRelease
ubuntu@master:~/storm $ ./bin/build-release.sh apache-storm-*.zip
mesos.master.url: "zk://master:2181/mesos" storm.zookeeper.servers: - "master" nimbus.host: "master"
ubuntu@master:~/storm-mesos $ bin/storm-mesos nimbus
ubuntu@master:~/storm-mesos $ bin/storm ui
至此,Storm 已经运行在 Mesos 上了。Storm Web UI 如下图所示,显示集群配置有 0 个 supervisor,因为 supervisor 是在运行拓扑时按需创建的。
另外,在 Mesos UI 上,Storm 会被列为活动框架。现在就可以运行多种流处理作业了。Storm 项目在 examples/storm-starter 目录下有丰富的样例拓扑。运行 ExclamationTopology,该拓扑会在输入的单词后加上感叹号。ExclamationTopology 是一个基本拓扑,带有一个数据源 word,两个数据操作 exclaim1 和 exclaim2,以线性方式链接:
ubuntu@master:~/storm-mesos $ bin/storm-mesos jar examples/storm -starter/storm-starter-topologies-*.jar storm.starter.ExclamationTopology mytopology
注意:Storm 要求给定集群里,拓扑名称是唯一的。
上述命令会将 ExclamationTopology 提交到 Storm 集群,命名为 mytopology。可以使用如下命令行接口验证该拓扑正在运行:
ubuntu@master:~/storm-mesos $ bin/storm list
还可以使用 Storm Web 接口查看拓扑的各种信息。输出以及其他日志存储在 logs 目录下,感叹号之后会显示 Storm 项目的贡献者。https://storm.apache.org/documentation 可以帮助理解更多 Storm 理念及各种 Storm 命令。
Storm-mesos 使用基于 YAML 的配置。至少需要设置如下配置参数:
控制资源配置十分重要,因为这会严重影响到 Storm 的可扩展性和延时。Storm-mesos 使用如下配置调优资源:
Storm-mesos 还指出下表可选的配置参数:
参数 | 描述 | 默认值 |
mesos.framework.name | 指定 Mesos 框架的名称 | Storm!!! |
mesos.framework.role | 指定可以用来认证的框架角色 | * |
mesos.framework.checkpoint | 设定是否启用框架检查点 | False |
mesos.allowed.hosts(和 mesos.disallowed.hosts) | 指定允许(不允许)运行拓扑的主机列表 | |
mesos.offer.lru.cache.size | 指定 offer 的 LRU 缓存大小 | 1000 |
mesos.local.file.server.port | 用于本地文件服务器的端口 | 随机端口 |
mesos.master.failover.timeout.secs | 为主节点指定框架故障转移超时时间,以秒为单位 | 3600 |
mesos.supervisor.suicide.inactive.timeout.secs | 指定 supervisor 在空闲时决定杀掉自己的等待时间,以秒为单位 | 120 |
我们已经了解到 Spark 可以用来处理大量数据。Spark Streaming 是 Spark API 的扩展,用来处理流数据。它支持各种各样的输入数据源,包括 Twitter、HDFS、Kafka、Flume、Akka Actor、TCP socket 和 ZeroMQ。Spark Streaming 将输入数据流分解成小批量,然后 Spark 程序处理这些离散化的流。已处理的小批量数据可以流转做进一步处理,或者保存到 HDFS、数据库等上。
Spark Streaming 有 DStream 或者离散流(http://www.cs.berkeley.edu/~matei/papers/2012/hotcloud_spark_streaming.pdf)的基础抽象。Spark 内部,DStream 以 RDD 序列的形式存在,DStream 上的操作被转化为 Dstream 里 RDD 的操作。这样自然拥有了 RDD 的所有优势,比如一致性、检查点等。下图展示了 Spark 是如何启动流处理的。
Spark Streaming 架构
Spark Steaming 支持很多不同的操作,无状态和有状态操作都支持。 下表列出了目前支持的操作:
Spark Streaming 支持的转化
转化 | 描述 |
cogroup(stream2, [numTasks]) | 根据给定键值从数据流里将对应的元组连接到一起。也就是说,当输入是(k,v1)和(k,v2),会返回(k, list<v1>, list<v2>) |
count() | 以新的 DStream 的形式返回 RDD 源里的元素数目 |
countByValue() | 计算 RDD 源里的键值频次,以 DStream 的形式返回(key,frequency)对 |
filter(f) | 过滤,返回新的 DStream,包含函数 f 返回值为 true 的 RDD |
join(stream2, [numTasks]) | 将两个数据流连接到一起,也就是说,输入是(k,v1)和(k,v2)时,会返回(k,(v1,v2)) |
map(f) 和 flatMap(f) | 函数 map() 对 DStream 里的每个 RDD 执行f函数。flatMap(f) 与 map(f) 类似,但是可以映射到 0 或者多条记录 |
reduce(f) 和 reduceByKey(f, [numTasks]) | 创建单个元素 RDD 的 DStream,通过 f 函数实现值的聚合。reduceByKey 还会有额外参数控制并行度 |
repartition(numPartitions) | 将 DStream 重新分配,控制并行度 |
transform(f) | 将任意的函数 f 应用到 Dstream 里的每个 RDD 上,生成新的 DStream |
union(stream2) | 合并 DStream stream2 里的 RDD 和当前 DStream 里的 RDD |
updateStateByKey(f) | 通过更新函数 f 更新每个键和该键对应的当前状态的值,生成新的 Dstream |
Spark Streaming 也支持基于窗口的操作,也就是说可以操作数据的滑动窗。WindowLength 和 slideInterval 参数控制窗口和操作间隔。下表列出了支持的基于窗口的操作:
Spark Steaming 支持的基于窗口的转化
转化 | 描述 |
countByWindow(windowLength, slideInterval) | 返回 Steam 里的滑动窗内的元素个数 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 返回新的 DStream,其中键的值和窗口内键的出现频率相同 |
window(windowLength, slideInterval) | 根据给定参数创建新的包括窗口内批数据的 DStream |
reduceByWindow(f, windowLength, slideInterval) | 创建新的 DStream,使用函数 f 聚合元素值 |
reduceByKeyAndWindow(f, windowLength, slideInterval, [numTasks]) 和 reduceByKeyAndWindow(f, invFunc, windowLength, slideInterval, [numTasks]) | 创建新的 DStream,其中每个键的值是函数 f 处理滑动窗内的元素的输出。也可以传入逆 reduce 函数做增量处理,从而提高效率 |
和 Spark 类似,这些操作是惰性执行的,如下输出操作才会触发计算:
Spark Streaming 支持的输出操作
输出操作 | 描述 |
foreachRDD(f) | 这是基本输出运算符,必须和其他操作联合使用(比如打印、保存到外部系统等) |
print() | 打印 DStream 里的每个批次数据的前十个元素 |
saveAsObjectFiles(prefix, [suffix]) saveAsTextFiles(prefix, [suffix]) saveAsHadoopFiles(prefix, [suffix]) | 将 DStream 分别保存到序列化文件,文本文件和 HDFS 文件里。文件名是 prefix-TIME_IN_MS[.suffix] |
如果已经在 Mesos 上运行了 Spark,那么随时都可以开始使用 Spark Steaming。无须为 Spark Streaming 做任何特殊配置。Spark 发行版在 examples 目录下包含多种样例,包括 Spark Streaming 样例。让我们运行 Spark Stream 样例之一,NetworkWordCount。该样例统计每秒给定网络流里给定单词出现的数量。
首先,需要创建网络流,可以在 TCP 端口使用 Netcat 发送文本。打开终端,键入如下命令,在端口 9999 启动 Netcat 服务器:
ubuntu@master:~ $ nc -lk 9999
现在在另一个终端,启动 Spark Steaming 样例:
ubuntu@master:~ $ cd spark ubuntu@master:~ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
NetworkWordCount 已经在运行了,会侦听第一个终端启动的 Netcat 里的输入。它会打印单词及每秒该单词出现的频率。比如,如果我们在 Netcat 窗口键入 hello world,就会在 Spark Streaming 窗口看到如下输出:
(hello,1) (world,1)
Spark Streaming 在 Mesos 里开箱即用。但是,对系统进行调优以满足实时流处理的需求和优化资源的使用是至关重要的。如下是调优需要考虑的几个方面。
选择流数据的批量大小是能否及时处理输入数据的决定性因素。批量大小不能设置过小,否则集群资源可能被浪费。另一方面,如果批量大小设置过大,流计算可能跟不上。因此,推荐的方式是从对于应用程序而言比较保守的批量大小开始,然后逐步检测出更小的数值。如果每个批次数据端到端处理的速度能够比输入批次数据的速度快,那么就说明系统可以胜任当前的处理速度。要度量该时间,可以使用 Spark 提供的org.apache.spark.scheduler.StreamingListener 接口。持续增加的延时是系统不能处理当前数据的信号。
在生产环境运行 Spark Streaming 时,需要格外注意的重要配置参数是 spark.cleaner.ttl。该参数控制 Spark 记忆的元数据的长度。Spark 默认不会移除任何元数据,有太多元数据时,就会影响到流处理应用程序。另外,如果该值设置得过低,窗口操作可能就无法处理窗口长度内的 RDD。因此,spark.cleaner.ttl 的值必须设置得比流处理应用程序的最大窗口长度更大。如果没有设置 spark.cleaner.ttl,Spark 会用最近最少使用(LRU)的方式清除所有 RDD。另外,将 spark.streaming.unpersist 设置为 true 可以启动一种更为智能的反持久化方案,系统会计算出哪些 RDD 可以从内存里移除。另外,推荐使用 Java 虚拟机的并发标记和移除式垃圾回收器,因为这样允许很多小型的 GC 暂停,而不是一个大型的,这会让流处理延时更稳定。
使用可用集群资源并行化处理十分重要。必须给操作传输合适的 numTask 参数,默认值是 8。也可以通过 spark.default.parallelism 来改变该默认值。
必须考虑到如果驱动节点和工作节点发生故障时该如何处理,因为所有的中间数据都可以根据 RDD 处理链重新计算出来。为了确保驱动器节点的恢复,必须启动检查点(通过 ssc.checkpoint 参数),应用程序必须检查前一个检查点状态是否存在。如果输入源是网络链接而工作节点在复制前失败了,还可以将数据复制到其他节点上,但是可能会丢失一小部分数据。对于持久化输入存储,比如 HDFS,工作节点失败不会造成任何数据的丢失。Spark Streaming文档(http://spark.apache.org/docs/latest/streaming-programming-guide.html)详细讨论了 Spark Streaming 所提供的容错语义。
Mesos 任务启动的额外开销对于低延时应用程序,比如 Spark Streaming,可能是致命的。Spark Streaming 必须运行在细粒度 Mesos 模式下,来减少任务启动的额外开销,第 3 章对此有详细解释。另外,为了减少 GC 暂停,Spark Streaming 将 RDD 以序列化二进制的格式持久化存储下来。这样,序列化/反序列化的额外开销可能很大,推荐使用快速序列化框架,比如 Kryo(https://github.com/EsotericSoftware/kryo)。另外,序列化任务也可能减少任务的网络传输时间,从而降低任务启动的额外开销。
数十年里,SQL 一直是数据分析的主要工具。随着大数据的兴起,很多系统尝试将数据库应用到大规模复杂数据分析领域。这样的系统包括 Hive、Shark、Spark SQL 和 NoSQL 数据库,比如 Cassandra、Hypertable 等。可以将这些类型的工作负载都运行在 Mesos 上,同时利用 Mesos 的优势,包括资源共享、容错等。下节详细介绍在 Mesos 上安装 Cassandra 的步骤。
Apache Cassandra(http://cassandra.apache.org)是流行的 NoSQL 数据库。Cassandra 由 Facebook 发起,在很多大规模部署环境上起着重要作用。通过在 Mesos 上运行 Cassandra,可以利用 Mesos 的容错和扩展能力。Cassandra 非常适合 Mesos,因为其架构是完全去中心化的。
要想在 Mesos 上运行 Cassandra,需要调度器和 Mesos 协调 Cassandra 所需的资源,执行器实际运行 Cassandra 的守护进程。调度器还需要将所有发行版和配置文件复制到所有节点上。Cassandra 配置需要定制种子节点,一旦调度器接受了来自 Mesos 的 offer,种子节点就会包含到调度器里。下列是在 Mesos 上运行 Cassandra 的步骤:
ubuntu@master:~ $ wget http://downloads.mesosphere.io/cassandra/cassandra-mesos-2.0.5-1.tgz
ubuntu@master:~ $ tar xzf cassandra-mesos-*.tgz ubuntu@master:~ $ cd cassandra-mesos-*
选项 | 描述 | 默认值 |
mesos.executor.uri | 指定 Cassandra 执行器的位置 | http://downloads.mesosphere.io/cassandra/cassandra-mesos-2.0.5.tgz |
mesos.master.url | 指定 Mesos 主节点位置 | zk://localhost:2181/mesos |
java.library.path | 指定 Mesos 库的位置 | /usr/local/lib/libmesos.so |
cassandra.noOfHwNodes | 指定想要 Cassandra 运行在多少台机器上 | 1 |
resource.cpus resource.mem resource.disk | 每个节点上 Cassandra 的资源要求 | 0.1 2048 1000 |
ubuntu@master:~/cassandra-mesos $ bin/cassandra-mesos set -o errexit -o pipefail FRAMEWORK_HOME='dirname $0'/.. dirname $0 export MESOS_NATIVE_LIBRARY=$(sed -e 's/:[^:////]/="/g;s/$/"/g;s/ *=/=/g' "$FRAMEWORK_HOME"/conf/mesos.yaml | tr -d $'/'' | grep -v /# | grep java.library.path | sed 's/java.library.path=// g;s/"//g') … # Start Cassandra on Mesos … 0 [main] INFO mesosphere.cassandra.Main$ - Starting Cassandra on Mesos. … 114 [Thread-0] INFO mesosphere.cassandra.CassandraScheduler - Starting Cassandra cluster ${clusterName} for the first time. Allocating new ID for it. … I0429 19:36:36.742849 27508 sched.cpp:391] Framework registered with 20140429-193514-580538634-5050-25835-0000 175 [Thread-1] INFO mesosphere.cassandra.CassandraScheduler - Framework registered as 20140429-193514-580538634-5050-25835-0000 437 [Thread-2] INFO mesosphere.cassandra.CassandraScheduler - Got new resource offers ArrayBuffer(slave1) 455 [Thread-2] INFO mesosphere.cassandra.CassandraScheduler - resources offered: List((cpus,2.0), (mem,6489.0), (disk,7935.0), (ports,0.0)) 455 [Thread-2] INFO mesosphere.cassandra.CassandraScheduler - resources required: List((cpus,0.1), (mem,2048.0), (disk,1000.0)) 464 [Thread-2] INFO mesosphere.cassandra.CassandraScheduler - Accepted offer: slave1 …
这里,被截断的输出显示 Cassandra 集群已经注册到 Mesos 上了,并且从 slave1 接收到了资源 offer。现在 Cassandra 已经启动并运行了,在 Web UI 里应该能看到列出的框架里有 Cassandra Test Cluster。可以通过 Cassandra 查询语言( Cassandra Query Language ( CQL ))shell 与之交互。从命令行或者通过 Web UI(UI上 的Host 字段),选择运行 Cassandra 的任意主机,使用如下命令连接到 CQL 会话中:
ubuntu@master:~/cassandra-mesos $ bin/cqlsh <cassandra-host>
应该能够看到 Cassandra 提示符(>cqlsh)。现在就可以运行 Cassandra 查询了。另外注意 Mesos 上的 Cassandra 支持很多场景和功能(比如扩展性),本文对此没有深入介绍。
批量处理系统不再是开发人员可用的唯一数据处理工具,新的应用程序及要求不同类型数据的分析用例层出不穷,而不是只有传统ETL工具和框架,比如 Hadoop,支持的一种场景。因此,与其尝试让批量处理系统更快或扩展传统数据库来处理非结构化数据,倒不如使用正确的工具来完成这些工作,从而让这些应用程序的开发和扩展更为容易。
本章探讨了使用运行在 Mesos 上的 Storm 和 Spark Streaming 处理实时和流数据的可选方案。也讲解了如何使用运行在 Mesos 上的 Cassandra 实现更多探索性数据分析。Cassandra 也是 Mesos 上更为通用的应用程序实例之一。
《Mesos:大数据资源调度与大规模容器运行最佳实践》结合大量实例介绍了Mesos的使用方法、核心原理及框架开发的相关内容。通过这些内容读者可以在数据中心环境中利用Mesos搭建分布式系统、进行大数据分析及开发分布式应用。
本书分为8章分别从使用、开发和运维等角度全面展示了Mesos 作为数据中心内核的强大能力、设计方面的精髓及在工程中的最佳实践。它还介绍了Mesos项目的最新进展和未来的发展方向,并给出了大量参考文献和相关链接方便读者进一步深入了解Mesos。适合分布式系统的研发、运维人员及相关技术爱好者阅读。