本文主要研究一下DebeziumEngine
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
@Incubating public interface DebeziumEngine<R> extends Runnable, Closeable { //...... public static <T> Builder<T> create(Class<? extends ChangeEventFormat<T>> eventFormat) { final ServiceLoader<Builder> loader = ServiceLoader.load(Builder.class); final Iterator<Builder> iterator = loader.iterator(); if (!iterator.hasNext()) { throw new DebeziumException("No implementation of Debezium engine builder was found"); } final Builder builder = iterator.next(); if (iterator.hasNext()) { LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass()); } return builder; } //...... }
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
public static interface Builder<R> { Builder<R> notifying(Consumer<R> consumer); Builder<R> notifying(ChangeConsumer<R> handler); Builder<R> using(Properties config); Builder<R> using(ClassLoader classLoader); Builder<R> using(Clock clock); Builder<R> using(CompletionCallback completionCallback); Builder<R> using(ConnectorCallback connectorCallback); Builder<R> using(OffsetCommitPolicy policy); DebeziumEngine<R> build(); }
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java
public static final class BuilderImpl implements Builder { private Configuration config; private DebeziumEngine.ChangeConsumer<SourceRecord> handler; private ClassLoader classLoader; private Clock clock; private DebeziumEngine.CompletionCallback completionCallback; private DebeziumEngine.ConnectorCallback connectorCallback; private OffsetCommitPolicy offsetCommitPolicy = null; @Override public Builder using(Configuration config) { this.config = config; return this; } @Override public Builder using(Properties config) { this.config = Configuration.from(config); return this; } @Override public Builder using(ClassLoader classLoader) { this.classLoader = classLoader; return this; } @Override public Builder using(Clock clock) { this.clock = clock; return this; } @Override public Builder using(DebeziumEngine.CompletionCallback completionCallback) { this.completionCallback = completionCallback; return this; } @Override public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) { this.connectorCallback = connectorCallback; return this; } @Override public Builder using(OffsetCommitPolicy offsetCommitPolicy) { this.offsetCommitPolicy = offsetCommitPolicy; return this; } @Override public Builder notifying(Consumer<SourceRecord> consumer) { this.handler = buildDefaultChangeConsumer(consumer); return this; } @Override public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) { this.handler = handler; return this; } @Override public Builder using(java.time.Clock clock) { return using(new Clock() { @Override public long currentTimeInMillis() { return clock.millis(); } }); } @Override public EmbeddedEngine build() { if (classLoader == null) { classLoader = getClass().getClassLoader(); } if (clock == null) { clock = Clock.system(); } Objects.requireNonNull(config, "A connector configuration must be specified."); Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified."); return new EmbeddedEngine(config, classLoader, clock, handler, completionCallback, connectorCallback, offsetCommitPolicy); } // backward compatibility methods @Override public Builder using(CompletionCallback completionCallback) { return using((DebeziumEngine.CompletionCallback) completionCallback); } @Override public Builder using(ConnectorCallback connectorCallback) { return using((DebeziumEngine.ConnectorCallback) connectorCallback); } }
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java
@ThreadSafe public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Configuration config; private final Clock clock; private final ClassLoader classLoader; private final DebeziumEngine.ChangeConsumer<SourceRecord> handler; private final DebeziumEngine.CompletionCallback completionCallback; private final DebeziumEngine.ConnectorCallback connectorCallback; private final AtomicReference<Thread> runningThread = new AtomicReference<>(); private final VariableLatch latch = new VariableLatch(0); private final Converter keyConverter; private final Converter valueConverter; private final WorkerConfig workerConfig; private final CompletionResult completionResult; private long recordsSinceLastCommit = 0; private long timeOfLastCommitMillis = 0; private OffsetCommitPolicy offsetCommitPolicy; private SourceTask task; private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) { this.config = config; this.handler = handler; this.classLoader = classLoader; this.clock = clock; this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> { if (!success) { logger.error(msg, error); } }; this.connectorCallback = connectorCallback; this.completionResult = new CompletionResult(); this.offsetCommitPolicy = offsetCommitPolicy; assert this.config != null; assert this.handler != null; assert this.classLoader != null; assert this.clock != null; keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader); keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true); valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader); Configuration valueConverterConfig = config; if (valueConverter instanceof JsonConverter) { // Make sure that the JSON converter is configured to NOT enable schemas ... valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build(); } valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false); // Create the worker config, adding extra fields that are required for validation of a worker config // but that are not used within the embedded engine (since the source records are never serialized) ... Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); workerConfig = new EmbeddedConfig(embeddedConfig); } public void run() { if (runningThread.compareAndSet(null, Thread.currentThread())) { final String engineName = config.getString(ENGINE_NAME); final String connectorClassName = config.getString(CONNECTOR_CLASS); final Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback); // Only one thread can be in this part of the method at a time ... latch.countUp(); try { if (!config.validateAndRecord(CONNECTOR_FIELDS, logger::error)) { fail("Failed to start connector with invalid configuration (see logs for actual errors)"); return; } // Instantiate the connector ... SourceConnector connector = null; try { @SuppressWarnings("unchecked") Class<? extends SourceConnector> connectorClass = (Class<SourceConnector>) classLoader.loadClass(connectorClassName); connector = connectorClass.getDeclaredConstructor().newInstance(); } catch (Throwable t) { fail("Unable to instantiate connector class '" + connectorClassName + "'", t); return; } // Instantiate the offset store ... final String offsetStoreClassName = config.getString(OFFSET_STORAGE); OffsetBackingStore offsetStore = null; try { @SuppressWarnings("unchecked") Class<? extends OffsetBackingStore> offsetStoreClass = (Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName); offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance(); } catch (Throwable t) { fail("Unable to instantiate OffsetBackingStore class '" + offsetStoreClassName + "'", t); return; } // Initialize the offset store ... try { offsetStore.configure(workerConfig); offsetStore.start(); } catch (Throwable t) { fail("Unable to configure and start the '" + offsetStoreClassName + "' offset backing store", t); return; } // Set up the offset commit policy ... if (offsetCommitPolicy == null) { offsetCommitPolicy = config.getInstance(EmbeddedEngine.OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class, config); } // Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ... ConnectorContext context = new ConnectorContext() { @Override public void requestTaskReconfiguration() { // Do nothing ... } @Override public void raiseError(Exception e) { fail(e.getMessage(), e); } }; connector.initialize(context); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, keyConverter, valueConverter); OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, keyConverter, valueConverter); Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS)); try { // Start the connector with the given properties and get the task configurations ... connector.start(config.asMap()); connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted); List<Map<String, String>> taskConfigs = connector.taskConfigs(1); Class<? extends Task> taskClass = connector.taskClass(); task = null; try { task = (SourceTask) taskClass.getDeclaredConstructor().newInstance(); } catch (IllegalAccessException | InstantiationException t) { fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t); return; } try { SourceTaskContext taskContext = new SourceTaskContext() { @Override public OffsetStorageReader offsetStorageReader() { return offsetReader; } public Map<String, String> configs() { // TODO Auto-generated method stub return null; } }; task.initialize(taskContext); task.start(taskConfigs.get(0)); connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted); } catch (Throwable t) { // Mask the passwords ... Configuration config = Configuration.from(taskConfigs.get(0)).withMaskedPasswords(); String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + config; fail(msg, t); return; } recordsSinceLastCommit = 0; Throwable handlerError = null; try { timeOfLastCommitMillis = clock.currentTimeInMillis(); RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout); while (runningThread.get() != null) { List<SourceRecord> changeRecords = null; try { logger.debug("Embedded engine is polling task for records on thread {}", runningThread.get()); changeRecords = task.poll(); // blocks until there are values ... logger.debug("Embedded engine returned from polling task for records"); } catch (InterruptedException e) { // Interrupted while polling ... logger.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get()); if (this.runningThread.get() == Thread.currentThread()) { // this thread is still set as the running thread -> we were not interrupted // due the stop() call -> probably someone else called the interrupt on us -> // -> we should raise the interrupt flag Thread.currentThread().interrupt(); } break; } catch (RetriableException e) { logger.info("Retrieable exception thrown, connector will be restarted", e); // Retriable exception should be ignored by the engine // and no change records delivered. // The retry is handled in io.debezium.connector.common.BaseSourceTask.poll() } try { if (changeRecords != null && !changeRecords.isEmpty()) { logger.debug("Received {} records from the task", changeRecords.size()); try { handler.handleBatch(changeRecords, committer); } catch (StopConnectorException e) { break; } } else { logger.debug("Received no records from the task"); } } catch (Throwable t) { // There was some sort of unexpected exception, so we should stop work handlerError = t; break; } } } finally { if (handlerError != null) { // There was an error in the handler so make sure it's always captured... fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(), handlerError); } try { // First stop the task ... logger.debug("Stopping the task and engine"); task.stop(); connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped); // Always commit offsets that were captured from the source records we actually processed ... commitOffsets(offsetWriter, commitTimeout, task); if (handlerError == null) { // We stopped normally ... succeed("Connector '" + connectorClassName + "' completed normally."); } } catch (Throwable t) { fail("Error while trying to stop the task and commit the offsets", t); } } } catch (Throwable t) { fail("Error while trying to run connector class '" + connectorClassName + "'", t); } finally { // Close the offset storage and finally the connector ... try { offsetStore.stop(); } catch (Throwable t) { fail("Error while trying to stop the offset store", t); } finally { try { connector.stop(); connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped); } catch (Throwable t) { fail("Error while trying to stop connector class '" + connectorClassName + "'", t); } } } } finally { latch.countDown(); runningThread.set(null); // after we've "shut down" the engine, fire the completion callback based on the results we collected completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error()); } } } public boolean stop() { logger.debug("Stopping the embedded engine"); // Signal that the run() method should stop ... Thread thread = this.runningThread.getAndSet(null); if (thread != null) { try { latch.await( Long.valueOf( System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } logger.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", thread, thread.isInterrupted()); // Interrupt the thread in case it is blocked while polling the task for records ... thread.interrupt(); return true; } return false; } @Override public void close() throws IOException { stop(); } }
DebeziumEngine提供了create方法,它接收eventFormat参数,然后使用ServiceLoader.load(Builder.class)加载Builder.class,然后返回第一个builder;BuilderImpl实现了Builder接口,其build方法创建的是EmbeddedEngine