本文主要研究一下debezium的SimpleSourceConnector
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java
public class SimpleSourceConnector extends SourceConnector { protected static final String VERSION = "1.0"; public static final String TOPIC_NAME = "topic.name"; public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch"; public static final String BATCH_COUNT = "batch.count"; public static final String DEFAULT_TOPIC_NAME = "simple.topic"; public static final String INCLUDE_TIMESTAMP = "include.timestamp"; public static final String RETRIABLE_ERROR_ON = "error.retriable.on"; public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 1; public static final int DEFAULT_BATCH_COUNT = 10; public static final boolean DEFAULT_INCLUDE_TIMESTAMP = false; private Map<String, String> config; public SimpleSourceConnector() { } @Override public String version() { return VERSION; } @Override public void start(Map<String, String> props) { config = props; } @Override public Class<? extends Task> taskClass() { return SimpleSourceConnector.SimpleConnectorTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> configs = new ArrayList<>(); configs.add(config); return configs; } @Override public void stop() { // do nothing } @Override public ConfigDef config() { return null; } //...... }
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java
public static class SimpleConnectorTask extends SourceTask { private int recordsPerBatch; private int errorOnRecord; private Queue<SourceRecord> records; private final AtomicBoolean running = new AtomicBoolean(); private List<SourceRecord> retryRecords = null; @Override public String version() { return VERSION; } @Override public void start(Map<String, String> props) { if (running.compareAndSet(false, true)) { Configuration config = Configuration.from(props); recordsPerBatch = config.getInteger(RECORD_COUNT_PER_BATCH, DEFAULT_RECORD_COUNT_PER_BATCH); int batchCount = config.getInteger(BATCH_COUNT, DEFAULT_BATCH_COUNT); String topic = config.getString(TOPIC_NAME, DEFAULT_TOPIC_NAME); boolean includeTimestamp = config.getBoolean(INCLUDE_TIMESTAMP, DEFAULT_INCLUDE_TIMESTAMP); errorOnRecord = config.getInteger(RETRIABLE_ERROR_ON, -1); // Create the partition and schemas ... Map<String, ?> partition = Collect.hashMapOf("source", "simple"); Schema keySchema = SchemaBuilder.struct() .name("simple.key") .field("id", Schema.INT32_SCHEMA) .build(); Schema valueSchema = SchemaBuilder.struct() .name("simple.value") .field("batch", Schema.INT32_SCHEMA) .field("record", Schema.INT32_SCHEMA) .field("timestamp", Schema.OPTIONAL_INT64_SCHEMA) .build(); // Read the offset ... Map<String, ?> lastOffset = context.offsetStorageReader().offset(partition); long lastId = lastOffset == null ? 0L : (Long) lastOffset.get("id"); // Generate the records that we need ... records = new LinkedList<>(); long initialTimestamp = System.currentTimeMillis(); int id = 0; for (int batch = 0; batch != batchCount; ++batch) { for (int recordNum = 0; recordNum != recordsPerBatch; ++recordNum) { ++id; if (id <= lastId) { // We already produced this record, so skip it ... continue; } if (!running.get()) { // the task has been stopped ... return; } // We've not seen this ID yet, so create a record ... Map<String, ?> offset = Collect.hashMapOf("id", id); Struct key = new Struct(keySchema); key.put("id", id); Struct value = new Struct(valueSchema); value.put("batch", batch + 1); value.put("record", recordNum + 1); if (includeTimestamp) { value.put("timestamp", initialTimestamp + id); } SourceRecord record = new SourceRecord(partition, offset, topic, 1, keySchema, key, valueSchema, value); records.add(record); } } } } @Override public List<SourceRecord> poll() throws InterruptedException { if (records.isEmpty()) { // block forever, as this thread will be interrupted if/when the task is stopped ... new CountDownLatch(1).await(); } if (running.get()) { if (retryRecords != null) { final List<SourceRecord> r = retryRecords; retryRecords = null; return r; } // Still running, so process whatever is in the queue ... List<SourceRecord> results = new ArrayList<>(); int record = 0; while (record < recordsPerBatch && !records.isEmpty()) { record++; final SourceRecord fetchedRecord = records.poll(); final Integer id = ((Struct) (fetchedRecord.key())).getInt32("id"); results.add(fetchedRecord); if (id == errorOnRecord) { retryRecords = results; throw new RetriableException("Error on record " + errorOnRecord); } } return results; } // No longer running ... return null; } @Override public void stop() { // Request the task to stop and return immediately ... running.set(false); } }
SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class;SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()