第一步:安装seatunnel和插件
部署seatunnel
第二步:部署和配置flink
下载
https://flink.apache.org/downloads/,这里选择flink-1.17.2-bin-scala_2.12.tgz
解压
$ tar -xzf flink-*.tgz
浏览目录
cd flink-* && ls -l
启动flink
flink dashboard
第三步:添加job定义文件
编辑
config/v2.streaming.conf.template
,
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {
FieldMapper {
source_table_name = "fake"
result_table_name = "fake1"
field_mapper = {
age = age
name = new_name
}
}
}
sink {
Console {
source_table_name = "fake1"
}
}
运行job(flink 版本在
1.15.x
以上的
)
cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
控制台输出
fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144