转载

使用嵌入式Debezium和SpringBoot捕获更改数据事件(CDC) - Sohan Ganapathy

在处理数据或复制数据源时,您可能已经听说过术语更改数据捕获(CDC)。顾名思义,“ CDC”是一种设计模式,可以持续识别并捕获数据的增量更改。该模式用于跨实时数据库到分析数据源或只读副本的实时数据复制。它还可以用于根据数据更改触发事件,例如 OutBox模式 。

大多数现代数据库通过事务日志支持CDC 。事务日志是对数据库所做的所有更改的顺序记录,而实际数据包含在单独的文件中。

在这个博客中,我想集中精力使用CDC常用的框架,并将其嵌入SpringBoot。

什么是Debezium?

Debezium是为CDC构建的分布式平台,它使用数据库事务日志并在行级别更改时创建事件流。侦听这些事件的应用程序可以基于增量数据更改来执行所需的操作。

Debezium提供了一个 连接器 库,支持当今可用的各种数据库。这些连接器可以监视和记录数据库模式中的行级更改,然后将更改发布到诸如 Kafka 的流服务上。

通常,将一个或多个连接器部署到 Kafka Connect 集群中,并配置为监视数据库并将数据更改事件发布到Kafka。分布式Kafka Connect群集可提供所需的容错能力和可伸缩性,从而确保所有已配置的连接器始终处于运行状态。

什么是嵌入式Debezium?

不需要容错和可靠性水平的应用程序,或者希望将运行整个平台的成本降至最低的应用程序,可以在应用程序中运行Debezium连接器。这是通过嵌入Debezium引擎并将连接器配置为在应用程序中运行来完成的。在发生数据更改事件时,连接器会将它们直接发送到应用程序。

使用SpringBoot运行Debezium

我们有一个SpringBoot应用程序“ Student CDC Relay”,它运行嵌入式Debezium,并追加包含“ Student”表的Postgres数据库的事务日志。当在“Student”表上执行诸如插入/更新/删除之类的数据库操作时,在SpringBoot应用程序中配置的Debezium连接器将在应用程序内调用一个方法。该方法对这些事件起作用,并在ElasticSearch上的Student索引中同步数据。

示例代码可在 此处 找到。

安装所需工具

可以在下面的docker-compose文件中安装所有必需的工具。这将在端口5432上启动Postgres数据库,并在端口9200(HTTP)和9300(Transport)上启动Elastic Search 。

version: <font>"3.5"</font><font>

services:
  # Install postgres and setup the student database.
  postgres:
    container_name: postgres
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=studentdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password

  # Install Elasticsearch.
  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:6.8.0
    environment:
    - discovery.type=single-node
    ports:
      - 9200:9200
      - 9300:9300
</font>

我们使用镜像debezium/postgres,因为它预先构建了 逻辑解码 功能。这是一种机制,它允许提取已提交到事务日志的更改,从而使CDC成为可能。可以在 此处 找到将插件安装到Postgres的文档。

了解代码

第一步是为debezium-embedded和定义Maven依赖项debezium-connector。该示例从Postgres读取更改,因此我们使用Postgres连接器。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium.version}</version>
</dependency>

然后,我们配置连接器,该连接器侦听Student表上的更改。我们使用PostgresConnector,对于connector.class这是由Debezium提供设置。这是追加数据库的连接器的Java类的名称。

连接器还具有一个重要的设置- offset.storage,它可以帮助应用程序从事务日志中跟踪已处理的数量。如果应用程序在处理过程中失败,它可以从重新启动后失败的地方继续读取更改。有多种存储偏移量的方法,但是在此示例中,我们使用类FileOffsetBackingStore将偏移量存储在由定义的本地文件中offset.storage.file.filename。连接器会记录文件中的偏移量,对于它读取的每个更改,Debezium引擎都会根据setting定期将偏移量刷新到文件中offset.flush.interval.ms。

连接器的其他参数是容纳Student表的Postgres数据库属性。

@Bean
<b>public</b> io.debezium.config.Configuration studentConnector() {
    <b>return</b> io.debezium.config.Configuration.create()
            .with(<font>"connector.class"</font><font>, </font><font>"io.debezium.connector.postgresql.PostgresConnector"</font><font>)
            .with(</font><font>"offset.storage"</font><font>,  </font><font>"org.apache.kafka.connect.storage.FileOffsetBackingStore"</font><font>)
            .with(</font><font>"offset.storage.file.filename"</font><font>, </font><font>"/path/cdc/offset/student-offset.dat"</font><font>)
            .with(</font><font>"offset.flush.interval.ms"</font><font>, 60000)
            .with(</font><font>"name"</font><font>, </font><font>"student-postgres-connector"</font><font>)
            .with(</font><font>"database.server.name"</font><font>, studentDBHost+</font><font>"-"</font><font>+studentDBName)
            .with(</font><font>"database.hostname"</font><font>, studentDBHost)
            .with(</font><font>"database.port"</font><font>, studentDBPort)
            .with(</font><font>"database.user"</font><font>, studentDBUserName)
            .with(</font><font>"database.password"</font><font>, studentDBPassword)
            .with(</font><font>"database.dbname"</font><font>, studentDBName)
            .with(</font><font>"table.whitelist"</font><font>, STUDENT_TABLE_NAME).build();
}
</font>

设置嵌入式Debezium的最后更改是在应用程序启动时启动它。为此,我们使用类 EmbeddedEngine ,该类充当连接器的包装器并管理连接器的生命周期。使用连接器配置和为每个数据更改事件将调用的函数(在我们的示例中为method)创建引擎handleEvent()。

<b>private</b> CDCListener(Configuration studentConnector, StudentService studentService) {
    <b>this</b>.engine = EmbeddedEngine
            .create()
            .using(studentConnector)
            .notifying(<b>this</b>::handleEvent).build();

    <b>this</b>.studentService = studentService;
}

在handleEvent()解析每个事件时,StudentService使用Spring Data JPA for Elasticsearch对发生的操作进行标识并调用,以在Elastic Search上执行创建/更新/删除操作。

现在我们已经设置好了,EmbeddedEngine我们可以使用该Executor服务异步启动它了。

<b>private</b> <b>final</b> Executor executor = Executors.newSingleThreadExecutor();

...

@PostConstruct
<b>private</b> <b>void</b> start() {
    <b>this</b>.executor.execute(engine);
}

@PreDestroy
<b>private</b> <b>void</b> stop() {
    <b>if</b> (<b>this</b>.engine != <b>null</b>) {
        <b>this</b>.engine.stop();
    }
}

看到实际的代码

一旦我们使用命令运行docker-compose文件并使用命令docker-compose up -d启动'Student CDC Relay' 启动了所有必需的工具mvn spring-boot:run。我们可以通过运行以下脚本来设置Student表:

CREATE TABLE <b>public</b>.student
(
    id integer NOT NULL,
    address character varying(255),
    email character varying(255),
    name character varying(255),
    CONSTRAINT student_pkey PRIMARY KEY (id)
);

为了查看运行中的代码,我们在刚创建的表上进行了数据更改。

将记录插入到学生表中:

运行下面的SQL将记录插入到Postgres的Student表中。

INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Jack','Dallas, TX','jack@gmail.com');

确认Elastic Search上数据已经更改:

$ curl -X GET http:<font><i>//localhost:9200/student/student/1?pretty=true</i></font><font>
{
  </font><font>"_index"</font><font> : </font><font>"student"</font><font>,
  </font><font>"_type"</font><font> : </font><font>"student"</font><font>,
  </font><font>"_id"</font><font> : </font><font>"1"</font><font>,
  </font><font>"_version"</font><font> : 31,
  </font><font>"_seq_no"</font><font> : 30,
  </font><font>"_primary_term"</font><font> : 1,
  </font><font>"found"</font><font> : <b>true</b>,
  </font><font>"_source"</font><font> : {
    </font><font>"id"</font><font> : 1,
    </font><font>"name"</font><font> : </font><font>"Jack"</font><font>,
    </font><font>"address"</font><font> : </font><font>"Dallas, TX"</font><font>,
    </font><font>"email"</font><font> : </font><font>"jack@gmail.com"</font><font>
  }
}
</font>

更新:

UPDATE STUDENT SET EMAIL='jill@gmail.com', NAME='Jill' WHERE ID = 1;

我们可以在Elastic Search上验证数据已更改为“Jill”

$ curl -X GET http:<font><i>//localhost:9200/student/student/1?pretty=true</i></font><font>
{
  </font><font>"_index"</font><font> : </font><font>"student"</font><font>,
  </font><font>"_type"</font><font> : </font><font>"student"</font><font>,
  </font><font>"_id"</font><font> : </font><font>"1"</font><font>,
  </font><font>"_version"</font><font> : 32,
  </font><font>"_seq_no"</font><font> : 31,
  </font><font>"_primary_term"</font><font> : 1,
  </font><font>"found"</font><font> : <b>true</b>,
  </font><font>"_source"</font><font> : {
    </font><font>"id"</font><font> : 1,
    </font><font>"name"</font><font> : </font><font>"Jill"</font><font>,
    </font><font>"address"</font><font> : </font><font>"Dallas, TX"</font><font>,
    </font><font>"email"</font><font> : </font><font>"jill@gmail.com"</font><font>
  }
}
</font>

最后的想法

这种方法确实非常简单,只有很少的活动部件,但是在缩放方面受到更大的限制,并且对故障的容忍度也大大降低。

当CDC-Relay应用程序运行良好时,将完全处理 一次  源记录,但是在CDC-Relay应用程序重新启动后,底层应用程序确实需要容忍接收重复事件。

我们可以通过在另一个端口上启动“ Student CDC Relay”的另一个实例来测试围绕缩放的限制,我们看到以下异常:

2019-11-20 12:33:17.901 ERROR 59453 --- [pool-2-thread-1] io.debezium.embedded.EmbeddedEngine      : Error <b>while</b> trying to run connector <b>class</b> 'io.debezium.connector.postgresql.PostgresConnector'

Caused by: org.postgresql.util.PSQLException: ERROR: replication slot <font>"debezium"</font><font> is active <b>for</b> PID <>
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1116) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:842) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38) ~[postgresql-42.2.5.jar:42.2.5]
    at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37) ~[postgresql-42.2.5.jar:42.2.5]
</font>

如果您的应用程序需要所有消息 的至少一次 传送保证,最好将完整的分布式Debezium系统与Kafka-Connect一起使用。

原文  https://www.jdon.com/53411
正文到此结束
Loading...