原创

Spring Boot集成canal快速入门demo

1.什么是canal?

canal 是阿里开源的一款 MySQL 数据库增量日志解析工具,提供增量数据订阅和消费。 canal_00  

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据
mysql_replication

Canal 工作原理

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

2.环境搭建

搭建mysql

version: '3'
services:
  mysql:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7
    container_name: mysql_3306
    restart: unless-stopped
    volumes:
      - "./mysql/my.cnf:/etc/mysql/my.cnf"
      - "./mysql/init-file.sql:/etc/mysql/init-file.sql"
      - "./mysql/data:/var/lib/mysql"
#      - "./mysql/conf.d:/etc/mysql/conf.d"
      - "./mysql/log/mysql/error.log:/var/log/mysql/error.log"
      - "./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" # init sql script directory -- tips: it can be excute  when `/var/lib/mysql` is empty
    environment:                        # set environment,equals docker run -e
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      MYSQL_ROOT_PASSWORD: root         # set root password
      MYSQL_DATABASE: root              # init database name
    ports:                              # port mappping
      - "3306:3306"
注意my.cnf在windows系统必须为只读,否则忽略
docker-compose -f docker-compose.yml -p mysql57 up -d
查看日志是否开启
show variables like 'log_%';
如果启用了,这个查询将返回log_bin的值为ON。 7219851613665

搭建canal-server

下载我canal客户端,官方地址进行相应版本的安装包进行下载(注意:如果下载翻到本文最下面联系我): https://github.com/alibaba/canal/releases 29 修改配置文件
example/instance.properties
instance.properties 基本上只用改动 dbUsername 和 dbPassword就可以了
启动 8587158

3.代码工程

实验目的

实现canal读取mysql的变更数据

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>canal</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>
</project>

client

package com.et.canal.client;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.context.CanalContext;
import top.javatool.canal.client.handler.EntryHandler;
import top.javatool.canal.client.model.CanalModel;

import java.util.Map;

@Slf4j
@Component
@CanalTable(value = "all")
public class AllConsumer implements EntryHandler<Map<String, String>> {
    @Override
    public void insert(Map<String, String> map) {
        log.info("add,{}", map);
    }

    @Override
    public void update(Map<String, String> before, Map<String, String> after) {
        // CanalModel
        CanalModel canal = CanalContext.getModel();
        log.info("update,update before={},update after={}", before, after);
    }

    @Override
    public void delete(Map<String, String> map) {
        log.info("delete,{}", map);
    }
}

application.yaml

canal:
  mode: simple
  filter: # 过滤表名,可以为空
  batch-size: 1
  timeout: 1
  server: 127.0.0.1:11111
  destination: example # canal-server中定义的实例名,不可以为空
  user-name: root
  password: root
  async: true # 必须是true,设为false在启动时会报MessageHandler bean找不到,具体原因没看

4.测试

  • 启动springboot应用
  • 更新数据库数据
  • 观察控制台变化

5.引用

 
正文到此结束
Loading...