转载

聊聊CanalInstanceGenerator

本文主要研究一下CanalInstanceGenerator

CanalInstanceGenerator

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstanceGenerator.java

public interface CanalInstanceGenerator {

    /**
     * 通过 destination 产生特定的 {@link CanalInstance}
     * 
     * @param destination
     * @return
     */
    CanalInstance generate(String destination);
}
  • CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance

PlainCanalInstanceGenerator

canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java

public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {

    private static final Logger    logger      = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
    private String                 springXml;
    private PlainCanalConfigClient canalConfigClient;
    private String                 defaultName = "instance";
    private BeanFactory            beanFactory;
    private Properties             canalConfig;

    public PlainCanalInstanceGenerator(Properties canalConfig){
        this.canalConfig = canalConfig;
    }

    public CanalInstance generate(String destination) {
        synchronized (CanalInstanceGenerator.class) {
            try {
                PlainCanal canal = canalConfigClient.findInstance(destination, null);
                if (canal == null) {
                    throw new CanalException("instance : " + destination + " config is not found");
                }
                Properties properties = canal.getProperties();
                // merge local
                properties.putAll(canalConfig);

                // 设置动态properties,替换掉本地properties
                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

                return (CanalInstance) beanFactory.getBean(beanName);
            } catch (Throwable e) {
                logger.error("generator instance failed.", e);
                throw new CanalException(e);
            } finally {
                System.setProperty("canal.instance.destination", "");
            }
        }
    }

    // ================ setter / getter ================

    private BeanFactory getBeanFactory(String springXml) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
        return applicationContext;
    }

    public void setCanalConfigClient(PlainCanalConfigClient canalConfigClient) {
        this.canalConfigClient = canalConfigClient;
    }

    public void setSpringXml(String springXml) {
        this.springXml = springXml;
    }

}
  • PlainCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canal.getProperties()设置相关属性,然后再通过beanFactory.getBean(beanName)获取CanalInstance

ManagerCanalInstanceGenerator

canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/ManagerCanalInstanceGenerator.java

public class ManagerCanalInstanceGenerator implements CanalInstanceGenerator {

    private CanalConfigClient canalConfigClient;

    public CanalInstance generate(String destination) {
        Canal canal = canalConfigClient.findCanal(destination);
        String filter = canalConfigClient.findFilter(destination);
        return new CanalInstanceWithManager(canal, filter);
    }

    // ================ setter / getter ================

    public void setCanalConfigClient(CanalConfigClient canalConfigClient) {
        this.canalConfigClient = canalConfigClient;
    }

}
  • ManagerCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canalConfigClient获取canal及filter,然后创建CanalInstanceWithManager

SpringCanalInstanceGenerator

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/SpringCanalInstanceGenerator.java

public class SpringCanalInstanceGenerator implements CanalInstanceGenerator {

    private static final Logger logger      = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
    private String              springXml;
    private String              defaultName = "instance";
    private BeanFactory         beanFactory;

    public CanalInstance generate(String destination) {
        synchronized (CanalInstanceGenerator.class) {
            try {
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

                return (CanalInstance) beanFactory.getBean(beanName);
            } catch (Throwable e) {
                logger.error("generator instance failed.", e);
                throw new CanalException(e);
            } finally {
                System.setProperty("canal.instance.destination", "");
            }
        }
    }

    private BeanFactory getBeanFactory(String springXml) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
        return applicationContext;
    }

    public void setSpringXml(String springXml) {
        this.springXml = springXml;
    }
}
  • SpringCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过beanFactory.getBean(beanName)获取CanalInstance

小结

CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance;它有三个实现类分别是PlainCanalInstanceGenerator、ManagerCanalInstanceGenerator、SpringCanalInstanceGenerator

doc

  • CanalInstanceGenerator
原文  https://segmentfault.com/a/1190000022397646
正文到此结束
Loading...