本文主要研究一下CanalEventSink
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.java
public interface CanalEventSink<T> extends CanalLifeCycle { /** * 提交数据 * * @param event * @param remoteAddress * @param destination * @throws CanalSinkException * @throws InterruptedException */ boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException, InterruptedException; /** * 中断消费,比如解析模块发生了切换,想临时中断当前的merge请求,清理对应的上下文状态,可见{@linkplain GroupEventSink} */ void interrupt(); }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.java
public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> { protected CanalEventFilter filter; protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>(); public void setFilter(CanalEventFilter filter) { this.filter = filter; } public void addHandler(CanalEventDownStreamHandler handler) { this.handlers.add(handler); } public CanalEventDownStreamHandler getHandler(int index) { return this.handlers.get(index); } public void addHandler(CanalEventDownStreamHandler handler, int index) { this.handlers.add(index, handler); } public void removeHandler(int index) { this.handlers.remove(index); } public void removeHandler(CanalEventDownStreamHandler handler) { this.handlers.remove(handler); } public CanalEventFilter getFilter() { return filter; } public List<CanalEventDownStreamHandler> getHandlers() { return handlers; } public void interrupt() { // do nothing } }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> { private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class); private static final int maxFullTimes = 10; private CanalEventStore<Event> eventStore; protected boolean filterTransactionEntry = false; // 是否需要尽可能过滤事务头/尾 protected boolean filterEmtryTransactionEntry = true; // 是否需要过滤空的事务头/尾 protected long emptyTransactionInterval = 5 * 1000; // 空的事务输出的频率 protected long emptyTransctionThresold = 8192; // 超过8192个事务头,输出一个 protected volatile long lastTransactionTimestamp = 0L; protected AtomicLong lastTransactionCount = new AtomicLong(0L); protected volatile long lastEmptyTransactionTimestamp = 0L; protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L); protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0L); protected boolean raw; public EntryEventSink(){ addHandler(new HeartBeatEntryEventHandler()); } public void start() { super.start(); Assert.notNull(eventStore); if (eventStore instanceof MemoryEventStoreWithBuffer) { this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw(); } for (CanalEventDownStreamHandler handler : getHandlers()) { if (!handler.isStart()) { handler.start(); } } } public void stop() { super.stop(); for (CanalEventDownStreamHandler handler : getHandlers()) { if (handler.isStart()) { handler.stop(); } } } public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) { return sinkData(entrys, remoteAddress); } //...... }
canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> { //...... private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress) throws InterruptedException { boolean hasRowData = false; boolean hasHeartBeat = false; List<Event> events = new ArrayList<Event>(); for (CanalEntry.Entry entry : entrys) { if (!doFilter(entry)) { continue; } if (filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) { long currentTimestamp = entry.getHeader().getExecuteTime(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; } } hasRowData |= (entry.getEntryType() == EntryType.ROWDATA); hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT); Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw); events.add(event); } if (hasRowData || hasHeartBeat) { // 存在row记录 或者 存在heartbeat记录,直接跳给后续处理 return doSink(events); } else { // 需要过滤的数据 if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) { long currentTimestamp = events.get(0).getExecuteTime(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) { lastEmptyTransactionCount.set(0L); lastEmptyTransactionTimestamp = currentTimestamp; return doSink(events); } } // 直接返回true,忽略空的事务头和尾 return true; } } protected boolean doSink(List<Event> events) { for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.before(events); } long blockingStart = 0L; int fullTimes = 0; do { if (eventStore.tryPut(events)) { if (fullTimes > 0) { eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart); } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.after(events); } return true; } else { if (fullTimes == 0) { blockingStart = System.nanoTime(); } applyWait(++fullTimes); if (fullTimes % 100 == 0) { long nextStart = System.nanoTime(); eventsSinkBlockingTime.addAndGet(nextStart - blockingStart); blockingStart = nextStart; } } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.retry(events); } } while (running && !Thread.interrupted()); return false; } //...... }
CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口;AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性;EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法