原创

Spring Boot集成Debezium快速入门demo

1.Debezium介绍

Debezium是一个用来捕获数据库数据变更的分布式服务,你的应用可以看到这些数据变更,以及处理他们。Debezium以更改事件流的形式记录每张表的行级变更。然后应用可以以事件流产生的顺序读取事件流变更记录。目前支持的Source Connectors是Mysql,MongoDB,PostgresSQL、Oracle、SQL Server、Db2、Cassamdra、Vitesss。

2.mysql 环境安装,开启bin-log

采用docker-compose来搭建测试环境
version: '3'
services:
  mysql:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7  # 原镜像`mysql:5.7`
    container_name: mysql_3306                                    # 容器名为'mysql_3306'
    restart: unless-stopped                                       # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:                                                      # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "./mysql/my.cnf:/etc/mysql/my.cnf"
      - "./mysql/init-file.sql:/etc/mysql/init-file.sql"
      - "./mysql/data:/var/lib/mysql"
#      - "./mysql/conf.d:/etc/mysql/conf.d"
      - "./mysql/log/mysql/error.log:/var/log/mysql/error.log"
      - "./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" # 可执行初始化sql脚本的目录 -- tips:`/var/lib/mysql`目录下无数据的时候才会执行(即第一次启动的时候才会执行)
    environment:                        # 设置环境变量,相当于docker run命令中的-e
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      MYSQL_ROOT_PASSWORD: root         # 设置root用户密码
      MYSQL_DATABASE: demo              # 初始化的数据库名称
    ports:                              # 映射端口
      - "3306:3306"
启动
docker-compose -f docker-compose.yml -p mysql5.7 up -d
在demo数据库中创建表
create table user_info
(
    user_id     varchar(64)          not null
        primary key,
    username    varchar(100)         null comment '用户名',
    age         int(3)               null comment '年龄',
    gender      tinyint(1)           null comment '字典类型',
    remark      varchar(255)         null comment '描述',
    create_time datetime             null comment '创建时间',
    create_id   varchar(64)          null comment '创建人ID',
    update_time datetime             null comment '修改时间',
    update_id   varchar(64)          null comment '修改人ID',
    enabled     tinyint(1) default 1 null comment '删除状态(1-正常,0-删除)'
)
    comment '字典表';
查看bin-log是否开启
show variables like 'log_%';
50  

3.代码工程

实现利用debezium监听bin-log日志来捕获mysql变更数据

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>debezium</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <debezium.version>1.5.2.Final</debezium.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>${debezium.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>

    </dependencies>
</project>

application.yaml

timely:
  # 是否开启
  switch: true
  # 偏移量文件
  offset-file-name: D:\IdeaProjects\ETFramework\debezium\docker\offsets.dat
  # 是否启东时清除偏移量文件
  offset-file-clean: true
  # 偏移量提交时间 单位ms
  offset-time: 1
  # 读取历史记录文件
  history-file-name: D:\IdeaProjects\ETFramework\debezium\docker\dbhistory.dat
  # 读取的数据库信息
  offline:
    ip: 127.0.0.1
    port: 3306
    username: root
    password: root
    # 保证每个数据库读取的 instance-name  logic-name 不能相同
    # 实例名
    instance-name: mysql-connector
    # 逻辑名
    logic-name: mysql-customer
    # 读取的表
    include-table: dbo.vehicle
    # 读取的库
    include-db: demo
    #mysql.cnf 配置的 server-id
    server-id: 1

server:
  port: 8088

config

package com.et.debezium.config;

import cn.hutool.core.io.FileUtil;
import com.et.debezium.handler.ChangeEventHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import java.util.concurrent.*;

/**
 * @author lei
 * @create 2021-06-22 15:36
 * @desc sql server 实时同步
 **/
@Configuration
@Log4j2
public class ChangeEventConfig {
    private final ChangeEventHandler changeEventHandler;

    @Value("${timely.offset-file-name}")
    private String offsetFileName;
    @Value("${timely.offset-file-clean:true}")
    private Boolean offsetFileDelete;
    @Value("${timely.offset-time}")
    private String offsetTime;
    @Value("${timely.history-file-name}")
    private String historyFileName;
    @Value("${timely.offline.instance-name}")
    private String instanceName;
    @Value("${timely.offline.logic-name}")
    private String logicName;
    @Value("${timely.offline.ip}")
    private String ip;
    @Value("${timely.offline.port}")
    private String port;
    @Value("${timely.offline.username}")
    private String username;
    @Value("${timely.offline.password}")
    private String password;
    @Value("${timely.offline.include-table}")
    private String includeTable;
    @Value("${timely.offline.include-db}")
    private String includeDb;
    @Value("${timely.offline.server-id}")
    private String serverId;

    @Autowired
    public ChangeEventConfig(ChangeEventHandler changeEventHandler) {
        this.changeEventHandler = changeEventHandler;
    }

    @Bean
    public void cleanFile() {
        if (offsetFileDelete && FileUtil.exist(offsetFileName)) {
            FileUtil.del(offsetFileName);
        }
    }

    /**
     * Debezium 配置.
     *
     * @return configuration
     */
        @Bean
        io.debezium.config.Configuration debeziumConfig() {
            return io.debezium.config.Configuration.create()
    //            连接器的Java类名称
                    .with("connector.class", MySqlConnector.class.getName())
    //            偏移量持久化,用来容错 默认值
                    .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    //                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
    //                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
                    .with("offset.storage.file.filename", offsetFileName)
    //                捕获偏移量的周期
                    .with("offset.flush.interval.ms", offsetTime)
    //               连接器的唯一名称
                    .with("name", instanceName)
    //                数据库的hostname
                    .with("database.hostname", ip)
    //                端口
                    .with("database.port", port)
    //                用户名
                    .with("database.user", username)
    //                密码
                    .with("database.password", password)
    //                 包含的数据库列表
                    .with("database.include.list", includeDb)
    //                是否包含数据库表结构层面的变更,建议使用默认值true
                    .with("include.schema.changes", "false")
    //                mysql.cnf 配置的 server-id
                    .with("database.server.id", serverId)
    //                 MySQL 服务器或集群的逻辑名称
                    .with("database.server.name", logicName)
    //                历史变更记录
                    .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
    //                历史变更记录存储位置,存储DDL
                    .with("database.history.file.filename", historyFileName)
                    .build();
        }
  /*  @Bean
    io.debezium.config.Configuration debeziumConfig() {
        return io.debezium.config.Configuration.create()
                //  连接器的Java类名称
                .with("connector.class", SqlServerConnector.class.getName())
                // 偏移量持久化,用来容错 默认值
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                // 要存储偏移量的文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
                // 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
                .with("offset.storage.file.filename", offsetFileName)
                // 尝试提交偏移量的时间间隔。默认值为 1分钟
                .with("offset.flush.interval.ms", offsetTime)
                // 监听连接器实例的 唯一名称
                .with("name", instanceName)
                // SQL Server 实例的地址
                .with("database.hostname", ip)
                // SQL Server 实例的端口号
                .with("database.port", port)
                // SQL Server 用户的名称
                .with("database.user", username)
                // SQL Server 用户的密码
                .with("database.password", password)
                // 要从中捕获更改的数据库的名称
                .with("database.dbname", includeDb)
                // 是否包含数据库表结构层面的变更 默认值true
                .with("include.schema.changes", "false")
                // Debezium 应捕获其更改的所有表的列表
                .with("table.include.list", includeTable)
                // SQL Server 实例/集群的逻辑名称,形成命名空间,用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 转换器时对应的 Avro 架构的命名空间用来
                .with("database.server.name", logicName)
                // 负责数据库历史变更记录持久化Java 类的名称
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
                // 历史变更记录存储位置,存储DDL
                .with("database.history.file.filename", historyFileName)
                .build();
    }*/

    /**
     * 实例化sql server 实时同步服务类,执行任务
     *
     * @param configuration
     * @return
     */
    @Bean
    SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) {
        SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
        DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
                .create(ChangeEventFormat.of(Connect.class))
                .using(configuration.asProperties())
                .notifying(changeEventHandler::handlePayload)
                .build();
        sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
        return sqlServerTimelyExecutor;
    }

    /**
     * @author lei
     * @version 1.0
     * @date 2021-06-22 15:39
     * @desc 同步执行服务类
     */
    @Data
    @Log4j2
    public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle {
        private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
        private DebeziumEngine<?> debeziumEngine;

        @Override
        public void start() {
            log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
            executor.execute(debeziumEngine);
        }

        @SneakyThrows
        @Override
        public void stop() {
            log.warn("debeziumEngine 监听实例关闭!");
            debeziumEngine.close();
            Thread.sleep(2000);
            log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池关闭!");
            executor.shutdown();
        }

        @Override
        public boolean isRunning() {
            return false;
        }

        @Override
        public void afterPropertiesSet() {
            Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
        }

        public enum ThreadPoolEnum {
            /**
             * 实例
             */
            INSTANCE;

            public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";
            /**
             * 线程池单例
             */
            private final ExecutorService es;

            /**
             * 枚举 (构造器默认为私有)
             */
            ThreadPoolEnum() {
                final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(SQL_SERVER_LISTENER_POOL + "-%d").build();
                es = new ThreadPoolExecutor(8, 16, 60,
                        TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
                        threadFactory, new ThreadPoolExecutor.DiscardPolicy());
            }

            /**
             * 公有方法
             *
             * @return ExecutorService
             */
            public ExecutorService getInstance() {
                return es;
            }
        }
    }

}

handler

package com.et.debezium.handler;


import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.et.debezium.model.ChangeListenerModel;
import io.debezium.data.Envelope;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;

/**
 * @author lei
 * @create 2021-06-22 16:11
 * @desc 变更数据处理
 **/
@Service
@Log4j2
public class ChangeEventHandler {

    public static final String DATA = "data";
    public static final String BEFORE_DATA = "beforeData";
    public static final String EVENT_TYPE = "eventType";
    public static final String SOURCE = "source";
    public static final String TABLE = "table";

    private enum FilterJsonFieldEnum {
        /**
         * 表
         */
        table,
        /**
         * 库
         */
        db,
        /**
         * 操作时间
         */
        ts_ms,
        ;

        public static Boolean filterJsonField(String fieldName) {
            return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
        }
    }

    /**
     * @author lei
     * @create 2021-06-24 16:04
     * @desc 变更类型枚举
     **/
    public enum EventTypeEnum {
        /**
         * 增
         */
        CREATE(1),
        /**
         * 删
         */
        UPDATE(2),
        /**
         * 改
         */
        DELETE(3),
        ;
        @Getter
        private final int type;

        EventTypeEnum(int type) {
            this.type = type;
        }
    }

    public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                              DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
        for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) {
            SourceRecord sourceRecord = r.record();
            Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
            if (sourceRecordChangeValue == null) {
                continue;
            }
            // 获取变更表数据
            Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
            if (CollectionUtils.isEmpty(changeMap)) {
                continue;
            }
            ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
            if (changeListenerModel == null) {
                continue;
            }
            String jsonString = JSON.toJSONString(changeListenerModel);
            log.info("发送变更数据:{}", jsonString);
        }
        try {
            recordCommitter.markBatchFinished();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) {
        // 操作类型过滤,只处理增删改
        Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
        if (operation != Envelope.Operation.READ) {
            Integer eventType = null;
            Map<String, Object> result = new HashMap<>(4);
            if (operation == Envelope.Operation.CREATE) {
                eventType = EventTypeEnum.CREATE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
                result.put(BEFORE_DATA, null);
            }
            // 修改需要特殊处理,拿到前后的数据
            if (operation == Envelope.Operation.UPDATE) {
                if (!changeMap.containsKey(TABLE)) {
                    return null;
                }
                eventType = EventTypeEnum.UPDATE.getType();
                String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
                // 忽略非重要属性变更
                Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
                if (CollectionUtils.isEmpty(resultMap)) {
                    return null;
                }
                result.put(DATA, resultMap.get(AFTER));
                result.put(BEFORE_DATA, resultMap.get(BEFORE));
            }
            if (operation == Envelope.Operation.DELETE) {
                eventType = EventTypeEnum.DELETE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
                result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
            }
            result.put(EVENT_TYPE, eventType);
            result.putAll(changeMap);
            return BeanUtil.copyProperties(result,ChangeListenerModel.class);
        }
        return null;
    }

    /**
     * 过滤非重要变更数据
     *
     * @param sourceRecordChangeValue
     * @param currentTableName
     * @return
     */
    private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) {
        Map<String, String> resultMap = new HashMap<>(4);
        Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
        Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
        //todo 根据表过滤字段
        resultMap.put(AFTER, JSON.toJSONString(afterMap));
        resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
        return resultMap;
    }

    /**
     * 校验是否仅仅是非重要字段属性变更
     * @param currentTableName
     * @param afterMap
     * @param beforeMap
     * @param filterColumnList
     * @return
     */
    private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
                                          Map<String, Object> beforeMap, List<String> filterColumnList) {
        Map<String, Boolean> filterMap = new HashMap<>(16);
        for (String key : afterMap.keySet()) {
            Object afterValue = afterMap.get(key);
            Object beforeValue = beforeMap.get(key);
            filterMap.put(key, !Objects.equals(beforeValue, afterValue));
        }
        filterColumnList.parallelStream().forEach(filterMap::remove);
        if (filterMap.values().stream().noneMatch(x -> x)) {
            log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName);
            return true;
        }
        return false;
    }


    public String getChangeData(Struct sourceRecordChangeValue, String record) {
        Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
        if (CollectionUtils.isEmpty(changeDataMap)) {
            return null;
        }
        return JSON.toJSONString(changeDataMap);
    }

    public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) {
        Struct struct = (Struct) sourceRecordChangeValue.get(record);
        // 将变更的行封装为Map
        Map<String, Object> changeData = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null)
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (CollectionUtils.isEmpty(changeData)) {
            return null;
        }
        return changeData;
    }

    private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) {
        Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
        Map<String, Object> map = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) {
            map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
            map.remove(FilterJsonFieldEnum.ts_ms.name());
        }
        return map;
    }

}

model

package com.et.debezium.model;

import lombok.Data;

/**
 * @author lei
 * @create 2021-06-23 09:58
 * @desc sqlServer数据变更model
 **/
@Data
public class ChangeListenerModel {
    /**
     * 当前DB
     */
    private String db;
    /**
     * 当前表
     */
    private String table;
    /**
     * 操作类型 1add 2update 3 delete
     */
    private Integer eventType;
    /**
     * 操作时间
     */
    private Long changeTime;
    /**
     * 现数据
     */
    private String data;
    /**
     * 之前数据
     */
    private String beforeData;
}
package com.et.debezium;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

4.测试

启动应用
2024-03-20 10:31:25.968 INFO 23260 --- [rce-coordinator] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:25.971 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to 127.0.0.1:3306 at mysql-bin.000001/154 (sid:1, cid:8)
2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] i.d.c.m.MySqlStreamingChangeEventSource : Connected to MySQL binlog at 127.0.0.1:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000001, currentBinlogPosition=154, currentRowNumber=0, serverId=0, sourceTime=2024-03-20T02:31:25.942Z, threadId=-1, currentQuery=null, tableIds=[demo.user], databaseName=demo], partition={server=mysql-customer}, snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000001, restartBinlogPosition=154, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]
2024-03-20 10:31:25.983 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Waiting for keepalive thread to start
2024-03-20 10:31:25.984 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:26.090 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Keepalive thread is running
2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Value' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Value'
2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Key' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Key'
2024-03-20 10:33:36.537 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Envelope' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Envelope'
2024-03-20 10:33:36.540 INFO 23260 --- [-127.0.0.1:3306] i.d.r.history.DatabaseHistoryMetrics : Already applied 7 database changes
在user_info插入一些数据
INSERT INTO demo.user_info
(user_id, username, age, gender, remark, create_time, create_id, update_time, update_id, enabled)
VALUES('1', '1', 1, 1, '1', NULL, '1', NULL, '1', 1);
观察控制台输出
2024-03-20 10:34:13.156 INFO 23260 --- [listener-pool-0] c.e.debezium.handler.ChangeEventHandler : 发送变更数据:{"changeTime":1710902052000,"data":"{\"update_id\":\"1\",\"create_id\":\"1\",\"gender\":1,\"user_id\":\"1\",\"remark\":\"1\",\"enabled\":1,\"age\":1,\"username\":\"1\"}","db":"demo","eventType":1,"table":"user_info"}

5.引用

正文到此结束
Loading...