转载

聊聊CanalInstance

本文主要研究一下CanalInstance

CanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java

public interface CanalLifeCycle {

    void start();

    void stop();

    boolean isStart();
}
  • CanalLifeCycle接口定义了start、stop、isStart方法

AbstractCanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.java

public abstract class AbstractCanalLifeCycle implements CanalLifeCycle {

    protected volatile boolean running = false; // 是否处于运行中

    public boolean isStart() {
        return running;
    }

    public void start() {
        if (running) {
            throw new CanalException(this.getClass().getName() + " has startup , don't repeat start");
        }

        running = true;
    }

    public void stop() {
        if (!running) {
            throw new CanalException(this.getClass().getName() + " isn't start , please check");
        }

        running = false;
    }

}
  • AbstractCanalLifeCycle实现了CanalLifeCycle接口,其定义了running属性,start方法设置running为true,stop设置running为false,isStart返回running值

CanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java

public interface CanalInstance extends CanalLifeCycle {

    String getDestination();

    CanalEventParser getEventParser();

    CanalEventSink getEventSink();

    CanalEventStore getEventStore();

    CanalMetaManager getMetaManager();

    CanalAlarmHandler getAlarmHandler();

    /**
     * 客户端发生订阅/取消订阅行为
     */
    boolean subscribeChange(ClientIdentity identity);

    CanalMQConfig getMqConfig();
}
  • CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

AbstractCanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {

    private static final Logger                      logger = LoggerFactory.getLogger(AbstractCanalInstance.class);

    protected Long                                   canalId;                                                      // 和manager交互唯一标示
    protected String                                 destination;                                                  // 队列名字
    protected CanalEventStore<Event>                 eventStore;                                                   // 有序队列

    protected CanalEventParser                       eventParser;                                                  // 解析对应的数据信息
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 链接parse和store的桥接器
    protected CanalMetaManager                       metaManager;                                                  // 消费信息管理器
    protected CanalAlarmHandler                      alarmHandler;                                                 // alarm报警机制
    protected CanalMQConfig                          mqConfig;                                                     // mq的配置

    //......

    @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

        if (eventParser.isStart()) {
            beforeStopEventParser(eventParser);
            eventParser.stop();
            afterStopEventParser(eventParser);
        }

        if (eventSink.isStart()) {
            eventSink.stop();
        }

        if (eventStore.isStart()) {
            eventStore.stop();
        }

        if (metaManager.isStart()) {
            metaManager.stop();
        }

        if (alarmHandler.isStart()) {
            alarmHandler.stop();
        }

        logger.info("stop successful....");
    }

    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                    if(singleEventParser instanceof AbstractEventParser) {
                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                    }
                }
            } else {
                if(eventParser instanceof AbstractEventParser) {
                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
                }
            }

        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发,可以考虑
        return true;
    }


    @Override
    public String getDestination() {
        return destination;
    }

    @Override
    public CanalEventParser getEventParser() {
        return eventParser;
    }

    @Override
    public CanalEventSink getEventSink() {
        return eventSink;
    }

    @Override
    public CanalEventStore getEventStore() {
        return eventStore;
    }

    @Override
    public CanalMetaManager getMetaManager() {
        return metaManager;
    }

    @Override
    public CanalAlarmHandler getAlarmHandler() {
        return alarmHandler;
    }

    @Override
    public CanalMQConfig getMqConfig() {
        return mqConfig;
    }

    //......

}
  • AbstractCanalInstance继承了AbstractCanalLifeCycle,它覆盖了start、stop方法,其start方法分别启动metaManager、alarmHandler、eventStore、eventSink、eventParser,其stop方法分别关闭eventParser、eventSink、eventStore、metaManager、alarmHandler;其subscribeChange方法根据ClientIdentity的pattern创建AviaterRegexFilter,然后设置给eventParser

CanalInstanceWithSpring

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java

public class CanalInstanceWithSpring extends AbstractCanalInstance {

    private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);

    public void start() {
        logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
        super.start();
    }

    // ======== setter ========

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setEventParser(CanalEventParser eventParser) {
        this.eventParser = eventParser;
    }

    public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
        this.eventSink = eventSink;
    }

    public void setEventStore(CanalEventStore<Event> eventStore) {
        this.eventStore = eventStore;
    }

    public void setMetaManager(CanalMetaManager metaManager) {
        this.metaManager = metaManager;
    }

    public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
        this.alarmHandler = alarmHandler;
    }

    public void setMqConfig(CanalMQConfig mqConfig){
        this.mqConfig = mqConfig;
    }

}
  • CanalInstanceWithSpring继承了AbstractCanalInstance,它专门给注册到spring容器使用

小结

CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

doc

  • [CanalInstance]( https://github.com/alibaba/ca...

)

原文  https://segmentfault.com/a/1190000022361070
正文到此结束
Loading...