转载

大数据上的流式SQL引擎-StreamCQL

华为在近期的 华为开发者大会 上宣布开源其流处理平台SQL引擎StreamCQL(Stream Continuous Query Language),表示欢迎更多的开发者加入社区,并将持续提供新的特性,目前在 Github 网站 上已能看到其项目代码和文档。

在最近的一次 meetup 上,华为实时分析团队高级技术专家、StreamCQL核心作者何志强对该项目进行了介绍。StreamCQL作为华为FusionInsight大数据平台的重要组件,提供了在分布式流处理平台上的类SQL查询能力,包括窗口计算等高级特性,在开源之前已经在电信、金融等多个行业成功应用,此次StreamCQL开源目的是让更多的社区开发爱好者、包括用户、认证伙伴加入到项目的开源贡献和应用中,以更好的开源项目来服务用户,发展社区。华为作为大数据平台的重要提供商,最近加快了在开源上的步伐,在之前几个月,其开源了另一个大数据项目 astro (Spark SQL on HBase package)。

StreamCQL是一个类 SQL的声明式语言,它用于在流(streams)和可更新关系(updatable relation)上的可持续查询,目的是在流处理平台分布式计算能力之上,通过使用简易通用的类SQL语言,使得业务逻辑的开发变得统一和简易。在功能上,StreamCQL弥补了传统流处理平台上一些基本业务功能的缺失,除了过滤、转换等基本SQL能力之外, 还引入基于内存窗口的计算、统计、关联等能力,以及流数据的拆分、合并等功能。下图是StreamCQL的功能架构图。

大数据上的流式SQL引擎-StreamCQL

1:StreamCQL的功能架构图

StreamCQL在逻辑架构上和通用的 SQL引擎比较相似,包含了语法分析、语义分析、逻辑优化、物理执行计划和引擎适配这些部分。不同的是,由于处理的是流数据,所以StreamCQL必须定义流和窗口这些概念,并基于这些概念,定义相关的语法、算子并进行逻辑优化和物理规划。另外,通过引擎适配,StreamCQL除了可以支持目前主流的Storm流处理引擎,也可以在将来支持其它的流处理引擎,如Flink。

下面,先介绍一下 StreamCQL中对流和窗口的定义。

  • 流:一个流就是一组(无穷)元素的集合,我们用 <s, t> 表示元组s在时间t到达流。
  • 窗口就是一个有限范围内、任意一个时间点的数据状态快照。

大数据上的流式SQL引擎-StreamCQL

2:流和窗口的关系

窗口概念的提出,让流中的数据有了边界,此时窗口中的数据等同于数据库中的静态表,所以可以数据表一样被执行各种操作和查询,在后面我们会给出相关的示例。 StreamCQL支持的窗口种类也是比较多的,比如按事件数量、按时间范围和按自然天的窗口,同时在窗口的移动上,也支持两种方式,即跳动窗口和滑动窗口,如图3和图4所示。可以看出,跳动窗口是在固定的时间间隔下进行窗口的移动,一次性清除窗口中之前的数据,而滑动窗口则是在窗口时钟的驱动下,窗口随之滑动,将滑动到窗口之外的数据清除。

大数据上的流式SQL引擎-StreamCQL

3:基于跳动窗口的示意图

大数据上的流式SQL引擎-StreamCQL

4:基于滑动窗口的示意图

由于 SteamSQL采用了类SQL的语法,所以在编写应用时要比使用原生的Storm API简洁的多,图5是一个简单应用下的语法对比。

大数据上的流式SQL引擎-StreamCQL

5:StreamCQL和Storm API的语法对比

下面我们再举一些实际的例子,来说明 StreamCQL的在流处理场景中的实际使用。

第一个流处理示例比较简单,该示例会从 Kafka中读取数据,然后统计条数,之后将统计结果写入Kafka的另外一个topic中。

1) 首先创建一个输入流 s1,使用默认的反序列化方式来解析数据。

CREATE INPUT STREAM s1  -- 指定输入流的名称

(id INT, name STRING, type INT)  -- 流中字段类型

SOURCE KafkaInput -- 指定数据源及相关的属性

PROPERTIES (groupid = "cqlClient", topic = "cqlInput",

zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" );

2) 然后创建输出流,使用默认的序列化方式输出数据。

CREATE OUTPUT STREAM s2  -- 指定输出流的名称

(cnt LONG)  -- 流中字段类型

SINK kafakOutput  -- 指定数据目的及相关属性

PROPERTIES ( topic = "cqlOut", zookeepers = "127.0.0.1:2181",

brokers = "127.0.0.1:9092" );

3) 接下来,执行从输入流转化到输出流的 StreamCQL语句,这个例子中只是统计事件的条数。

INSERT INTO STREAM s2 SELECT count(*) FROM s1;

4) 最后,将上述语句构成的 StreamCQL应用程序进行提交,并指定这个应用程序名称为cql_kafka_example。一旦提交,系统就开始进行应用程序的解析和发布。

SUBMIT APPLICATION cql_kafka_example;

有了上面的基础,我们可以看一个更复杂点的例子,这个例子中会将 2个输入流(s1、s2)的数据进行合并,并标识来源,然后将合并后的流与另一个输入流(s3)进行窗口的Join操作,最终生成输出流(rs):

1) 创建输入流 s1、s2和s3。

CREATE INPUT STREAM S1 (...) SOURCE KafkaInput PROPERTIES(...);

CREATE INPUT STREAM S2...;

CREATE INPUT STREAM S3...;

2) 创建输出流 RS。

CREATE OUTPUT STREAM RS(...) SINK kafkaOutput PROPERTIES(...)

3) S1和S2中的元组插入到临时流S4中,并添加常量作为来源判断字段,1表示来自s1,2表示来自s2。

INSERT INTO s4 SELECT *,1 FROM S1;INSERT INTO S4 SELECT *,2 FROM S2;

4) S4和S3的窗口数据经过内连接(inner join)后插入到输出流中,具体的,S4的窗口是记录数为10的跳动窗口,S3的窗口是3小时的滑动窗口。内连接的条件是s4的id字段等于s3的type字段,而且s4的id字段要大于5。

INSERT INTO STREAM rs

SELECT * FROM S4[ROWS 10 BATCH] 

INNER JOIN S3[RANGE 3 HOURS SLIDE]

ON s4.id=s3.type

WHERE s4.id > 5;

上面这个示例应用程序提交后生成的 Storm拓扑如图6所示。

大数据上的流式SQL引擎-StreamCQL

6:StreamCQL生成的Join功能的Storm拓扑

我们再看最后一个示例,和上面流的连接相反,这是个流拆分的例子,即从 1个输入流拆分出3个输出流。为了简单起见,我们只给出关键部分的语句:

FROM teststream  -- 指定输入流

INSERT INTO STREAM s1 SELECT *  -- 输入流所有字段都插入到输出流

s1中

INSERT INTO STREAM s2 SELECT a  -- 输入流中字段

a插入到输出流s2中

INSERT INTO STREAM s3 SELECT id, name WHERE id > 10  --输入流中字段

id、name插入到输出流s3中,且id必须大于10

PRARLLEL 4;  -- 输入算子的并发数量

这个示例最终生成的 Storm拓扑如图7所示:

大数据上的流式SQL引擎-StreamCQL

7:StreamCQL生成的Split功能的Storm拓扑

上述就是 StreamCQL的基本原理、架构和示例,简而言之,它为流式应用提供了非常便捷的类SQL的开发语言,你可以用它很方便的开发相关应用,完成基于流的各种查询统计。最后,StreamSQL开源后也有了一些新的规划方向,包括模式匹配、CQLServer、可靠性增强等。

(编辑:仲浩、李子健)

正文到此结束
Loading...