canal 自身提供了简单的客户端,数据格式较为复杂,处理消费数据也不太方便,为了方便给业务使用,提供一种直接能获取实体对象的方式来进行消费才更方便。
先说一下实现的思路,首先canal 客户端的消息对象有两种,message 和 flatMessage,分别是普通的消息(protobuf格式)和消息队列的扁平消息(json格式),现在将这两种消息转化为我们直接使用的 model 对象,根据消息中的数据库表名称找到对应的的实体对象,那么如何根据数据库表名找到实体对象呢?
第一种方式,如果我们的实体对象都使用JPA 的 @Table注解来标识表和实体的对应关系,可以使用该注解来找到实体对象和表名的关系
第二种方式,可以使用自定义注解的来标注实体和表名的关系,为解耦各个表的处理,我们使用策略模式来封装各个表的增删改操作
canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的)
SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.
client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.
canal 客户端可以主要分以下几种类型
这种方式下,可以启动多个客户端,连接同一个canal 服务端,多个客户端只有一个client 工作,其他的可以作为冷备,当一个client的挂了,其他的客户端会有一个进入工作模式
缺点:连接同一个服务端,如果服务端挂了将导致不可用
这种方式下,客户端连接多个canal服务端,一个客户端随机选择一个canal server 消费,当这个server 挂了,会选择另外一个进行消费
缺点:不支持订阅消费
使用zookeeper来server,client 的状态,当两个canal server 连接zookeeper 后,
优先连接的节点作为 活跃节点,client从活跃节点消费,当server挂了以后,从另外一个节点消费
缺点:不支持订阅消费
canal 支持消息直接发送到消息队列,从消息队列消费,目前支持的有kafka 和rocketMq,这种方式支持订阅消费
首先定义一个策略接口,定义增加,更新,删除功能,使用java 8声明方法为default,让客户端选择实现其中的方法,提高灵活性,客户端实现EntryHandler接口后,会返回基于handler中的泛型的实例对象,在对应的方法中实现自定义逻辑
public interface EntryHandler<T> { default void insert(T t) { } default void update(T before, T after) { } default void delete(T t) { } }
定义一个canalClient 的抽象类,封装canal 的链接开启关闭操作,启动一个线程不断去消费canal 数据,依赖一个 messageHandler 封装消息处理的逻辑
public abstract class AbstractCanalClient implements CanalClient { @Override public void start() { log.info("start canal client"); workThread = new Thread(this::process); workThread.setName("canal-client-thread"); flag = true; workThread.start(); } @Override public void stop() { log.info("stop canal client"); flag = false; if (null != workThread) { workThread.interrupt(); } } @Override public void process() { if (flag) { try { connector.connect(); connector.subscribe(filter); while (flag) { Message message = connector.getWithoutAck(batchSize, timeout, unit); log.info("获取消息 {}", message); long batchId = message.getId(); if (message.getId() != -1 && message.getEntries().size() != 0) { messageHandler.handleMessage(message); } connector.ack(batchId); } } catch (Exception e) { log.error("canal client 异常", e); } finally { connector.disconnect(); } } } }
基于该抽象类,分别提供各种客户端的实现
消息处理器 messageHandler 封装了消息处理逻辑,其中定义了一个消息处理方法
public interface MessageHandler<T> { void handleMessage(T t); }
消息处理器可能要适配4种情况,分别是消费message,flatMessage和两种消息的同步与异步消费
消息处理的工作主要有两个
首先我们封装一个抽象的 message 消息处理器,实现MessageHandler接口
public abstract class AbstractMessageHandler implements MessageHandler<Message> { @Override public void handleMessage(Message message) { List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { try { EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName()); if(entryHandler!=null){ CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()) .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build(); CanalContext.setModel(model); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); CanalEntry.EventType eventType = rowChange.getEventType(); for (CanalEntry.RowData rowData : rowDataList) { rowDataHandler.handlerRowData(rowData,entryHandler,eventType); } } } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); }finally { CanalContext.removeModel(); } } } } }
分别定义两个实现类,同步与异步实现类,继承AbstractMessageHandler抽象类
public class SyncMessageHandlerImpl extends AbstractMessageHandler { public SyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) { super(entryHandlers, rowDataHandler); } @Override public void handleMessage(Message message) { super.handleMessage(message); } }
public class AsyncMessageHandlerImpl extends AbstractMessageHandler { private ExecutorService executor; public AsyncMessageHandlerImpl(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler, ExecutorService executor) { super(entryHandlers, rowDataHandler); this.executor = executor; } @Override public void handleMessage(Message message) { executor.execute(() -> super.handleMessage(message)); } }
消息处理器依赖的行消息处理器主要是将原始的column list 转为 实体对象,并将相应的增删改消息交给相应的hangler对象方法,行消息处理器分别需要处理两种对象,一个是 message的行数据 和 flatMessage 的行数据
public interface RowDataHandler<T> { void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception; }
两个行处理器的实现为
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> { private IModelFactory<List<CanalEntry.Column>> modelFactory; public RowDataHandlerImpl(IModelFactory modelFactory) { this.modelFactory = modelFactory; } @Override public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception { if (entryHandler != null) { switch (eventType) { case INSERT: Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.insert(object); break; case UPDATE: Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated) .map(CanalEntry.Column::getName).collect(Collectors.toSet()); Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet); Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList()); entryHandler.update(before, after); break; case DELETE: Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList()); entryHandler.delete(o); break; default: break; } } } }
public class MapRowDataHandlerImpl implements RowDataHandler<List<Map<String, String>>> { private IModelFactory<Map<String,String>> modelFactory; public MapRowDataHandlerImpl(IModelFactory<Map<String, String>> modelFactory) { this.modelFactory = modelFactory; } @Override public void handlerRowData(List<Map<String, String>> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{ if (entryHandler != null) { switch (eventType) { case INSERT: Object object = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.insert(object); break; case UPDATE: Object before = modelFactory.newInstance(entryHandler, list.get(1)); Object after = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.update(before, after); break; case DELETE: Object o = modelFactory.newInstance(entryHandler, list.get(0)); entryHandler.delete(o); break; default: break; } } } }
行消息处理的依赖的工厂 主要是是通过反射创建与表名称对应的bean实例
public interface IModelFactory<T> { Object newInstance(EntryHandler entryHandler, T t) throws Exception; default Object newInstance(EntryHandler entryHandler, T t, Set<String> updateColumn) throws Exception { return null; } }
目前主要用于保存bean实例以外的其他数据,使用threadLocal实现
代码已在github开源 canal-client