展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。
场景 - 支持物联网的车队管理
一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:
应用架构
除了 Confluent Kafka 作为流媒体平台,该应用程序还具有以下组件:
下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。
我们现在将详细介绍这些组件中的每一个。
数据存储
该层存储所有用户数据。YugaByte DB用作数据库, YugaByte云查询语言(YCQL) 用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。
CREATE TABLE TrafficKeySpace.Origin_Table ( vehicleId text, routeId text, vehicleType text, longitude text, latitude text, timeStamp timestamp, speed <b>double</b>, fuelLevel <b>double</b>, PRIMARY KEY ((vehicleId), timeStamp) ) WITH <b>default</b>_time_to_live = 3600;
请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。
有三个表用于保存用于面向用户的显示的数据:
数据处理器不断更新这些表,仪表板从中读取。
以下是这些表:
CREATE TABLE TrafficKeySpace.Total_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Window_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Poi_Traffic( vehicleid text, vehicletype text, distance bigint, timeStamp timestamp, PRIMARY KEY (vehicleid) );
数据生产者
这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。
单个数据点是JSON有效负载,如下所示:
{ <font>"vehicleId"</font><font>:</font><font>"0bf45cac-d1b8-4364-a906-980e1c2bdbcb"</font><font>, </font><font>"vehicleType"</font><font>:</font><font>"Taxi"</font><font>, </font><font>"routeId"</font><font>:</font><font>"Route-37"</font><font>, </font><font>"longitude"</font><font>:</font><font>"-95.255615"</font><font>, </font><font>"latitude"</font><font>:</font><font>"33.49808"</font><font>, </font><font>"timestamp"</font><font>:</font><font>"2017-10-16 12:31:03"</font><font>, </font><font>"speed"</font><font>:49.0, </font><font>"fuelLevel"</font><font>:38.0 } </font>
消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。
数据处理器
KSQL 是Apache Kafka的流式SQL引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用Java或Python等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。
使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。
CREATE STREAM traffic_stream ( vehicleId varchar, vehicleType varchar, routeId varchar, timeStamp varchar, latitude varchar, longitude varchar) WITH ( KAFKA_TOPIC='iot-data-event', VALUE_FORMAT='json', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss');
现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后, Kafka Connect YugaByte DB Sink Connector 读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。
CREATE TABLE total_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='total_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream GROUP BY routeId, vehicleType; CREATE TABLE window_traffic WITH ( TIMESTAMP='timeStamp', KAFKA_TOPIC='window_traffic', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', PARTITIONS=1) AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) GROUP BY routeId, vehicleType; CREATE STREAM poi_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='poi_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT vehicleId, vehicleType, <b>cast</b>(GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') AS bigint) AS distance, timeStamp FROM traffic_stream WHERE GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') < 30;
所有 Kafka Connect YugaByte DB Sink Connector 用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:
数据仪表板
这是一个 Spring Boot 应用程序,它从YugaByte DB查询数据,并使用 Web Sockets 和 jQuery 将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用 bootstrap.js 显示包含图表和表格的仪表板。
我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。
@Repository <b>public</b> <b>interface</b> TotalTrafficDataRepository <b>extends</b> CassandraRepository<TotalTrafficData>{ @Query(<font>"SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ?0 ALLOW FILTERING"</font><font>) Iterable<TotalTrafficData> findTrafficDataByDate(String date); </font>
为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个 DatabaseConfig 类。
请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。
总结
此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在 yb-iot-fleet-management GitHub仓库中找到构建和运行应用程序的说明以及源代码。