许多公司使用 Apache Hadoop 等分布式文件系统来存储和分析数据。借助脱机 Hadoop 的流式传输分析,您可存储大量的大数据并实时分析它们。本文展示了一个使用 Spark Streaming 实现实时关键词检测的例子。
Spark Streaming 是 Spark API 的一个扩展,它支持对实时数据流执行可扩展的、容错的处理。Spark Streaming 拥有丰富的适配器,允许应用程序开发人员对各种数据源读写数据,包括 Hadoop 分布式文件系统 (HDFS)、Kafka、Twitter 等。
Spark Streaming 应用程序由一个或多个互联的、离散化的流 (DStream) 组成。每个 DStream 由一系列弹性分布式数据集 (RDD,Resilient Distributed Dataset) 组成,这些数据集是不可变的分布式数据集的抽象。Spark 支持不同的应用程序开发语言,包括 Java、Scala 和 Python。对于本文,我们将使用 Java 语言逐步展示如何开发关键词检测应用程序。
图 1 显示了这个关键词检测应用程序的总体视图。
弹性分布式数据集 (RDD) 是一个不可变的对象集合。每个对象被划分到集群的节点上且并行地执行。
SocketTextStream
允许您绑定并监听一个传输控制协议 (TCP) 套接字上的消息。SocketTextStream
的输出被提供给一个自定义流,后者使用当前关键词列表来查找相匹配的标记。TextFileStream
用于监视 Hadoop 目录。只要它检测到一个新文件,就会读取该文件并将其转换为 DStream。使用 TextFileStream
读取的值和一个自定义逻辑来更新内部关键词列表。每个 Spark Streaming 应用程序首先都需要一个流式处理上下文,如下面的代码段所示。“上下文” 要求您传递一个持续时间参数,该参数定义一个批次。
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
socketTextStream
绑定到一个指定的主机接口和给定的端口号,并生成 DStream。
JavaReceiverInputDStream lines = ssc.socketTextStream( hostname, port, StorageLevels.MEMORY_AND_DISK_SER);
textFileStream
使用 textFile 从 Hadoop 读取并行关键词字典文件。处理该文件后,会更新内部列表中的关键词。
JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest");
JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterable<String> call(String x) {
final Pattern SPACE = Pattern.compile(" ");
String[] vec=SPACE.split(x);
List<String> ls=Arrays.asList(vec);
return ls;
}
});
updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){
public Void call(JavaRDD<String> rdd) {
rdd.foreach(new VoidFunction<String>(){
@Override
public void call(String x){
if(x!=null)
keywords.add(x);
}});
return null;
从 SocketStream
读取的 DStream 用于同关键词列表进行对比,如以下代码所示。使用命令 wordPresent.print();
时,结果会显示在控制台上。
JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() {
@Override
public Boolean call(String x) {
return keywords.contains(x);
}
});
wordPresent.print();
下面的清单给出了本文中所用示例的完整代码。
public final class KeywordDetect { private static final Pattern SPACE = Pattern.compile(" "); public static List<String> keywords=new ArrayList<String>(); public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: KeywordDetect <hostname> <port> <words>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("KeywordDetect"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest"); JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); keywords.add("initial"); //Initialize keyword list JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() { @Override public Iterable<String> call(String x) { final Pattern SPACE = Pattern.compile(" "); String[] vec=SPACE.split(x); List<String> ls=Arrays.asList(vec); return ls; } }); updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){ public Void call(JavaRDD<String> rdd) { rdd.foreach(new VoidFunction<String>(){ @Override public void call(String x){ //x=x+1; if(x!=null) keywords.add(x); //add newly read tokens to keyword list }}); return null; } }); JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() { @Override public Boolean call(String x) { return keywords.contains(x); //compare token received from socket against keywords list } }); JavaDStream<String> inputWords = lines.map(new Function<String, String>() { @Override public String call(String x) { return x; } }); wordPresent.print(); ssc.start(); ssc.awaitTermination(); } }
对于本文中的示例,我们使用 Maven 来安装和构建应用程序。如果使用 Maven,请确保在 pom.xml 中添加了合适的依赖项。这些依赖项主要是 spark-core 和 spark-streaming 库。
以下代码给出了我们应用程序中使用的 pom 依赖项代码段:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
编译应用程序并创建 jar 文件后,使用下面的命令将应用程序提交到 Spark 调度程序:
spark-submit --class "org.apache.spark.examples.streaming.KeywordDetect" --master local[4] target/KeyWord-1.0.jar rvm.svl.ibm.com 9212
因为 Spark 实例在单个具有 4 个核心的单个主机上运行,所以我们为 –master 参数使用 local[4] 值。我们的应用程序接受两个参数:主机名和端口。
应用程序假设有一个服务器进程在端口 9212 上运行并发布数据。为了在测试环境中模拟一个服务器,我们使用 nc (netcat) Linux 命令:nc -l 9212
。
nc 命令绑定到 9212。我们传入终端中的任何输入内容都会转发到所有正在监听端口 9212 的客户端。
所有方面都正确设置后,所提交的作业会开始运行并监听端口 9212。您应在终端上得到以下确认消息:
15/09/06 01:43:31 INFO dstream.SocketReceiver:Connecting to rvm.svl.ibm.com:9121
15/09/06 01:43:31 INFO dstream.SocketReceiver:Connected to rvm.svl.ibm.com:9121
现在,让我们更新该程序使用的内部字典。第 1.2 节中的代码可监听 Hadoop 目录 /tmp/Streamtest 中的更改事件。如果尚未创建该目录,请首先创建它,然后使用下面给出的命令上传关键词文件:
hadoop fs -mkdir /tmp/Streamtest hadoop fs -put keywords /tmp/Streamtest
检测到一个新文件时,会执行后续的 RDD。
15/09/06 01:54:25 INFO dstream.FileInputDStream:New files at time 1441529665000 ms: hdfs://rvm.svl.ibm.com:8020/tmp/Streamtest/keyword
15/09/06 01:54:25 INFO storage.MemoryStore: ensureFreeSpace(272214) called with curMem=109298, maxMem=278302556
其中一个关键词是 "risk"。现在我提交 nc 中的关键词,如下面的清单所示。
[root@rvm Desktop]# nc -l 9121
risk
然后,Spark 会检测到该关键词并在控制台上标记为 true。
-------------------------------------------
Time:1441529995000 ms
-------------------------------------------
true
您可进一步增强此应用程序,从而处理完整的字符串,而不是单个标记。
您可将关键词检测状态写入到一个文件中,或者写入到一个 UI 呈现服务的端口。
如果出现 "connection refused error",可能是因为:
本文演示了如何使用 Spark Streaming 构建实时应用程序。我们还着重介绍了一个 Spark Streaming 应用程序的组成部分。使用此信息作为起点,有助于您使用 Spark Streaming 创建更复杂的应用程序。