一、业务场景
以商品累计销量实时榜单为例,存储在 MySQL 中的原始订单表,通过 Flink 处理计算出产品销量的实时排行,并实时同步至 StarRocks 的主键模型表中。最终用户可以通过可视化工具连接StarRocks查看到实时刷新的榜单。
二、mysql和starrocks表结构和数据
mysql
CREATE TABLE `orders` (
`order_id` bigint NOT NULL,
`product_id` int DEFAULT NULL,
`order_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`customer_name` varchar(200) NOT NULL,
`product_name` varchar(200) NOT NULL,
`price` decimal(10,5) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(1, 1, '2024-02-28 16:55:49', '1', '1', 1.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(5, 5, '2024-02-29 13:53:50', '5', '5', 5.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(2, 2, '2024-02-28 16:55:49', '2', '2', 2.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(3, 3, '2024-02-28 16:56:34', '3', '3', 3.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(4, 1, '2024-02-28 16:56:34', '4', '1', 1.00000);
starrocks
CREATE TABLE IF NOT EXISTS `orders` (
`product_id` INT(11) NOT NULL COMMENT "",
`product_name` STRING NOT NULL COMMENT "",
`sales_cnt` BIGINT NOT NULL COMMENT ""
) ENGINE=olap
PRIMARY KEY(`product_id`)
DISTRIBUTED BY HASH(`product_id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
);
三、flink sql创建同步任务
进入sql客户端
./bin/sql-client.sh
编写sql任务
###创建数据库
CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`;
###创建源表
CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` (
`order_id` BIGINT NOT NULL,
`product_id` INT NULL,
`order_date` TIMESTAMP NOT NULL,
`customer_name` STRING NOT NULL,
`product_name` STRING NOT NULL,
`price` DECIMAL(10, 5) NULL,
PRIMARY KEY(`order_id`)
NOT ENFORCED
) with (
'connector' = 'mysql-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = '3306',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'orders'
);
###创建sink表
CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` (
`product_id` INT NOT NULL,
`product_name` STRING NOT NULL,
`sales_cnt` BIGINT NOT NULL,
PRIMARY KEY(`product_id`)
NOT ENFORCED
) with (
'sink.max-retries' = '10',
'jdbc-url' = 'jdbc:mysql://${ip}:9030',
'password' = '{password}',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.format' = 'json',
'load-url' = '{fe}:8030',
'username' = 'xxx',
'sink.buffer-flush.interval-ms' = '15000',
'connector' = 'starrocks',
'database-name' = 'xxx',
'table-name' = 'orders'
);
###ETL任务
INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name;
INSERT INTO `default_catalog`.`demo`.`orders_sink` SELECT product_id,product_name, COUNT(*) AS cnt FROM `default_catalog`.`demo`.`orders_src` WHERE order_date >'2021-01-01 00:00:01' GROUP BY product_id,product_name;
四、测试
mysql源表数据如下:
order_id|product_id|order_date |customer_name|product_name|price |
--------+----------+-------------------+-------------+------------+-------+
1| 1|2024-02-28 16:55:49|1 |1 |1.00000|
5| 5|2024-02-29 13:53:50|5 |5 |5.00000|
2| 2|2024-02-28 16:55:49|2 |2 |2.00000|
3| 3|2024-02-28 16:56:34|3 |3 |3.00000|
4| 1|2024-02-28 16:56:34|4 |1 |1.00000|
经过ETL任务加工成目标表是如下样子的
product_id|product_name|sales_cnt|
----------+------------+---------+
1|1 | 2|
2|2 | 1|
3|3 | 1|
5|5 | 1|
五、注意事项
- 下载 Flink CDC connector。本示例的数据源为 MySQL,因此下载 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持对应的 Flink 版本,两者版本支持度,请参见 Supported Flink Versions。由于本文使用 Flink 1.14.5,因此可以使用 flink-sql-connector-mysql-cdc-2.2.0.jar。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
- 下载 flink-connector-starrocks,并且其版本需要对应 Flink 的版本。
flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 会包含三个版本号:
- 第一个版本号 x.x.x 为 flink-connector-starrocks 的版本号。
- 第二个版本号 y.yy 为其支持的 Flink 版本号。
- 第三个版本号 z.zz 为 Flink 支持的 Scala 版本号。如果 Flink 为 1.14.x 以及之前版本,则需要下载带有 Scala 版本号的 flink-connector-starrocks。
由于本文使用 Flink 版本号 1.14.5,Scala 版本号 2.11,因此可以下载 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar。
- 将 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar、1.2.3_flink-1.14_2.11.jar 移动至 Flink 的 lib 目录。
切记重启集群,否则汇报“flink table 查询报 unexpected block data”错误
六、引用