本文主要研究一下debezium的SnapshotChangeRecordEmitter
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { private final Object[] row; public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) { super(offset, clock); this.row = row; } @Override protected Operation getOperation() { return Operation.READ; } @Override protected Object[] getOldColumnValues() { throw new UnsupportedOperationException("Can't get old row values for READ record"); } @Override protected Object[] getNewColumnValues() { return row; } }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> { protected final Logger logger = LoggerFactory.getLogger(getClass()); public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) { super(offsetContext, clock); } @Override public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException { TableSchema tableSchema = (TableSchema) schema; Operation operation = getOperation(); switch (operation) { case CREATE: emitCreateRecord(receiver, tableSchema); break; case READ: emitReadRecord(receiver, tableSchema); break; case UPDATE: emitUpdateRecord(receiver, tableSchema); break; case DELETE: emitDeleteRecord(receiver, tableSchema); break; default: throw new IllegalArgumentException("Unsupported operation: " + operation); } } @Override protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset()); } @Override protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset()); } @Override protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } // some configurations does not provide old values in case of updates // in this case we handle all updates as regular ones if (oldKey == null || Objects.equals(oldKey, newKey)) { Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset()); } // PK update -> emit as delete and re-insert with new key else { Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset()); envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset()); } } @Override protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) { logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset()); } /** * Returns the operation done by the represented change. */ protected abstract Operation getOperation(); /** * Returns the old row state in case of an UPDATE or DELETE. */ protected abstract Object[] getOldColumnValues(); /** * Returns the new row state in case of a CREATE or READ. */ protected abstract Object[] getNewColumnValues(); /** * Whether empty data messages should be ignored. * * @return true if empty data messages coming from data source should be ignored.</br> * Typical use case are PostgreSQL changes without FULL replica identity. */ protected boolean skipEmptyMessages() { return false; } }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java
public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter { private final OffsetContext offsetContext; private final Clock clock; public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) { this.offsetContext = offsetContext; this.clock = clock; } @Override @SuppressWarnings({ "unchecked" }) public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException { Operation operation = getOperation(); switch (operation) { case CREATE: emitCreateRecord(receiver, (T) schema); break; case READ: emitReadRecord(receiver, (T) schema); break; case UPDATE: emitUpdateRecord(receiver, (T) schema); break; case DELETE: emitDeleteRecord(receiver, (T) schema); break; default: throw new IllegalArgumentException("Unsupported operation: " + operation); } } @Override public OffsetContext getOffset() { return offsetContext; } /** * Returns the clock of the change record(s) emitted. */ public Clock getClock() { return clock; } /** * Returns the operation associated with the change. */ protected abstract Operation getOperation(); /** * Emits change record(s) associated with a snapshot. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with an insert operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with an update operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with a delete operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException; }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java
public interface ChangeRecordEmitter { /** * Emits the change record(s) corresponding to data change represented by this emitter. */ void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException; /** * Returns the offset of the change record(s) emitted. */ OffsetContext getOffset(); public interface Receiver { void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException; } }
SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row