本文主要研究一下CanalInstance
canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java
public interface CanalLifeCycle { void start(); void stop(); boolean isStart(); }
canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.java
public abstract class AbstractCanalLifeCycle implements CanalLifeCycle { protected volatile boolean running = false; // 是否处于运行中 public boolean isStart() { return running; } public void start() { if (running) { throw new CanalException(this.getClass().getName() + " has startup , don't repeat start"); } running = true; } public void stop() { if (!running) { throw new CanalException(this.getClass().getName() + " isn't start , please check"); } running = false; } }
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java
public interface CanalInstance extends CanalLifeCycle { String getDestination(); CanalEventParser getEventParser(); CanalEventSink getEventSink(); CanalEventStore getEventStore(); CanalMetaManager getMetaManager(); CanalAlarmHandler getAlarmHandler(); /** * 客户端发生订阅/取消订阅行为 */ boolean subscribeChange(ClientIdentity identity); CanalMQConfig getMqConfig(); }
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java
public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance { private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class); protected Long canalId; // 和manager交互唯一标示 protected String destination; // 队列名字 protected CanalEventStore<Event> eventStore; // 有序队列 protected CanalEventParser eventParser; // 解析对应的数据信息 protected CanalEventSink<List<CanalEntry.Entry>> eventSink; // 链接parse和store的桥接器 protected CanalMetaManager metaManager; // 消费信息管理器 protected CanalAlarmHandler alarmHandler; // alarm报警机制 protected CanalMQConfig mqConfig; // mq的配置 //...... @Override public void start() { super.start(); if (!metaManager.isStart()) { metaManager.start(); } if (!alarmHandler.isStart()) { alarmHandler.start(); } if (!eventStore.isStart()) { eventStore.start(); } if (!eventSink.isStart()) { eventSink.start(); } if (!eventParser.isStart()) { beforeStartEventParser(eventParser); eventParser.start(); afterStartEventParser(eventParser); } logger.info("start successful...."); } @Override public void stop() { super.stop(); logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination }); if (eventParser.isStart()) { beforeStopEventParser(eventParser); eventParser.stop(); afterStopEventParser(eventParser); } if (eventSink.isStart()) { eventSink.stop(); } if (eventStore.isStart()) { eventStore.stop(); } if (metaManager.isStart()) { metaManager.stop(); } if (alarmHandler.isStart()) { alarmHandler.stop(); } logger.info("stop successful...."); } @Override public boolean subscribeChange(ClientIdentity identity) { if (StringUtils.isNotEmpty(identity.getFilter())) { logger.info("subscribe filter change to " + identity.getFilter()); AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter()); boolean isGroup = (eventParser instanceof GroupEventParser); if (isGroup) { // 处理group的模式 List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers(); for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动 if(singleEventParser instanceof AbstractEventParser) { ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter); } } } else { if(eventParser instanceof AbstractEventParser) { ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter); } } } // filter的处理规则 // a. parser处理数据过滤处理 // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据 // 后续内存版的一对多分发,可以考虑 return true; } @Override public String getDestination() { return destination; } @Override public CanalEventParser getEventParser() { return eventParser; } @Override public CanalEventSink getEventSink() { return eventSink; } @Override public CanalEventStore getEventStore() { return eventStore; } @Override public CanalMetaManager getMetaManager() { return metaManager; } @Override public CanalAlarmHandler getAlarmHandler() { return alarmHandler; } @Override public CanalMQConfig getMqConfig() { return mqConfig; } //...... }
canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java
public class CanalInstanceWithSpring extends AbstractCanalInstance { private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class); public void start() { logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination }); super.start(); } // ======== setter ======== public void setDestination(String destination) { this.destination = destination; } public void setEventParser(CanalEventParser eventParser) { this.eventParser = eventParser; } public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) { this.eventSink = eventSink; } public void setEventStore(CanalEventStore<Event> eventStore) { this.eventStore = eventStore; } public void setMetaManager(CanalMetaManager metaManager) { this.metaManager = metaManager; } public void setAlarmHandler(CanalAlarmHandler alarmHandler) { this.alarmHandler = alarmHandler; } public void setMqConfig(CanalMQConfig mqConfig){ this.mqConfig = mqConfig; } }
CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法
)