转载

flink SQL 实现mysql同步数据到starrocks

一、业务场景

以商品累计销量实时榜单为例,存储在 MySQL 中的原始订单表,通过 Flink 处理计算出产品销量的实时排行,并实时同步至 StarRocks 的主键模型表中。最终用户可以通过可视化工具连接StarRocks查看到实时刷新的榜单。

二、mysql和starrocks表结构和数据

mysql

CREATE TABLE `orders` (
 `order_id` bigint NOT NULL,
 `product_id` int DEFAULT NULL,
 `order_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
 `customer_name` varchar(200) NOT NULL,
 `product_name` varchar(200) NOT NULL,
 `price` decimal(10,5) DEFAULT NULL,
 PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(1, 1, '2024-02-28 16:55:49', '1', '1', 1.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(5, 5, '2024-02-29 13:53:50', '5', '5', 5.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(2, 2, '2024-02-28 16:55:49', '2', '2', 2.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(3, 3, '2024-02-28 16:56:34', '3', '3', 3.00000);
INSERT INTO data_platform_test.orders
(order_id, product_id, order_date, customer_name, product_name, price)
VALUES(4, 1, '2024-02-28 16:56:34', '4', '1', 1.00000);

starrocks

CREATE TABLE IF NOT EXISTS `orders` (
`product_id` INT(11) NOT NULL COMMENT "",
`product_name` STRING NOT NULL COMMENT "",
`sales_cnt` BIGINT NOT NULL COMMENT ""
) ENGINE=olap
PRIMARY KEY(`product_id`)
DISTRIBUTED BY HASH(`product_id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
);

三、flink sql创建同步任务

进入sql客户端
./bin/sql-client.sh
编写sql任务
###创建数据库
CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`;
###创建源表
CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` (
 `order_id` BIGINT NOT NULL,
 `product_id` INT NULL,
 `order_date` TIMESTAMP NOT NULL,
 `customer_name` STRING NOT NULL,
 `product_name` STRING NOT NULL,
 `price` DECIMAL(10, 5) NULL,
 PRIMARY KEY(`order_id`)
 NOT ENFORCED
) with (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx.xxx.xxx.xxx',
 'port' = '3306',
 'username' = 'xxx',
 'password' = 'xxx',
 'database-name' = 'xxx',
 'table-name' = 'orders'
);

###创建sink表
CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` (
 `product_id` INT NOT NULL,
 `product_name` STRING NOT NULL,
 `sales_cnt` BIGINT NOT NULL,
 PRIMARY KEY(`product_id`)
NOT ENFORCED
) with (
 'sink.max-retries' = '10',
 'jdbc-url' = 'jdbc:mysql://${ip}:9030',
 'password' = '{password}',
 'sink.properties.strip_outer_array' = 'true',
 'sink.properties.format' = 'json',
 'load-url' = '{fe}:8030',
 'username' = 'xxx',
 'sink.buffer-flush.interval-ms' = '15000',
 'connector' = 'starrocks',
 'database-name' = 'xxx',
 'table-name' = 'orders'
);

###ETL任务
INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name;


INSERT INTO `default_catalog`.`demo`.`orders_sink` SELECT product_id,product_name, COUNT(*) AS cnt FROM `default_catalog`.`demo`.`orders_src` WHERE order_date >'2021-01-01 00:00:01' GROUP BY product_id,product_name;

四、测试

mysql源表数据如下:
order_id|product_id|order_date |customer_name|product_name|price |
--------+----------+-------------------+-------------+------------+-------+
 1| 1|2024-02-28 16:55:49|1 |1 |1.00000|
 5| 5|2024-02-29 13:53:50|5 |5 |5.00000|
 2| 2|2024-02-28 16:55:49|2 |2 |2.00000|
 3| 3|2024-02-28 16:56:34|3 |3 |3.00000|
 4| 1|2024-02-28 16:56:34|4 |1 |1.00000|
经过ETL任务加工成目标表是如下样子的
product_id|product_name|sales_cnt|
----------+------------+---------+
 1|1 | 2|
 2|2 | 1|
 3|3 | 1|
 5|5 | 1|

五、注意事项

  1. 下载 Flink CDC connector。本示例的数据源为 MySQL,因此下载 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持对应的 Flink 版本,两者版本支持度,请参见 Supported Flink Versions。由于本文使用 Flink 1.14.5,因此可以使用 flink-sql-connector-mysql-cdc-2.2.0.jar。
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
    
  2. 下载 flink-connector-starrocks,并且其版本需要对应 Flink 的版本。
    flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 会包含三个版本号:
    • 第一个版本号 x.x.x 为 flink-connector-starrocks 的版本号。
    • 第二个版本号 y.yy 为其支持的 Flink 版本号。
    • 第三个版本号 z.zz 为 Flink 支持的 Scala 版本号。如果 Flink 为 1.14.x 以及之前版本,则需要下载带有 Scala 版本号的 flink-connector-starrocks。
    由于本文使用 Flink 版本号 1.14.5,Scala 版本号 2.11,因此可以下载 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar
  3. 将 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar1.2.3_flink-1.14_2.11.jar 移动至 Flink 的 lib 目录。
  切记重启集群,否则汇报“flink table 查询报 unexpected block data”错误

六、引用

正文到此结束
Loading...