本文主要研究一下CanalInstanceGenerator
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstanceGenerator.java
public interface CanalInstanceGenerator { /** * 通过 destination 产生特定的 {@link CanalInstance} * * @param destination * @return */ CanalInstance generate(String destination); }
canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java
public class PlainCanalInstanceGenerator implements CanalInstanceGenerator { private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class); private String springXml; private PlainCanalConfigClient canalConfigClient; private String defaultName = "instance"; private BeanFactory beanFactory; private Properties canalConfig; public PlainCanalInstanceGenerator(Properties canalConfig){ this.canalConfig = canalConfig; } public CanalInstance generate(String destination) { synchronized (CanalInstanceGenerator.class) { try { PlainCanal canal = canalConfigClient.findInstance(destination, null); if (canal == null) { throw new CanalException("instance : " + destination + " config is not found"); } Properties properties = canal.getProperties(); // merge local properties.putAll(canalConfig); // 设置动态properties,替换掉本地properties com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties); // 设置当前正在加载的通道,加载spring查找文件时会用到该变量 System.setProperty("canal.instance.destination", destination); this.beanFactory = getBeanFactory(springXml); String beanName = destination; if (!beanFactory.containsBean(beanName)) { beanName = defaultName; } return (CanalInstance) beanFactory.getBean(beanName); } catch (Throwable e) { logger.error("generator instance failed.", e); throw new CanalException(e); } finally { System.setProperty("canal.instance.destination", ""); } } } // ================ setter / getter ================ private BeanFactory getBeanFactory(String springXml) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml); return applicationContext; } public void setCanalConfigClient(PlainCanalConfigClient canalConfigClient) { this.canalConfigClient = canalConfigClient; } public void setSpringXml(String springXml) { this.springXml = springXml; } }
canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/ManagerCanalInstanceGenerator.java
public class ManagerCanalInstanceGenerator implements CanalInstanceGenerator { private CanalConfigClient canalConfigClient; public CanalInstance generate(String destination) { Canal canal = canalConfigClient.findCanal(destination); String filter = canalConfigClient.findFilter(destination); return new CanalInstanceWithManager(canal, filter); } // ================ setter / getter ================ public void setCanalConfigClient(CanalConfigClient canalConfigClient) { this.canalConfigClient = canalConfigClient; } }
canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/SpringCanalInstanceGenerator.java
public class SpringCanalInstanceGenerator implements CanalInstanceGenerator { private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class); private String springXml; private String defaultName = "instance"; private BeanFactory beanFactory; public CanalInstance generate(String destination) { synchronized (CanalInstanceGenerator.class) { try { // 设置当前正在加载的通道,加载spring查找文件时会用到该变量 System.setProperty("canal.instance.destination", destination); this.beanFactory = getBeanFactory(springXml); String beanName = destination; if (!beanFactory.containsBean(beanName)) { beanName = defaultName; } return (CanalInstance) beanFactory.getBean(beanName); } catch (Throwable e) { logger.error("generator instance failed.", e); throw new CanalException(e); } finally { System.setProperty("canal.instance.destination", ""); } } } private BeanFactory getBeanFactory(String springXml) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml); return applicationContext; } public void setSpringXml(String springXml) { this.springXml = springXml; } }
CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance;它有三个实现类分别是PlainCanalInstanceGenerator、ManagerCanalInstanceGenerator、SpringCanalInstanceGenerator