转载

聊聊MaxwellKafkaProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

public class MaxwellKafkaProducer extends AbstractProducer {
	private final ArrayBlockingQueue<RowMap> queue;
	private final MaxwellKafkaProducerWorker worker;

	public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
		super(context);
		this.queue = new ArrayBlockingQueue<>(100);
		this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);
		Thread thread = new Thread(this.worker, "maxwell-kafka-worker");
		thread.setDaemon(true);
		thread.start();
	}

	@Override
	public void push(RowMap r) throws Exception {
		this.queue.put(r);
	}

	@Override
	public StoppableTask getStoppableTask() {
		return this.worker;
	}

	@Override
	public KafkaProducerDiagnostic getDiagnostic() {
		return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());
	}
}
复制代码
  • MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
	static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);

	private final Producer<String, String> kafka;
	private final String topic;
	private final String ddlTopic;
	private final MaxwellKafkaPartitioner partitioner;
	private final MaxwellKafkaPartitioner ddlPartitioner;
	private final KeyFormat keyFormat;
	private final boolean interpolateTopic;
	private final ArrayBlockingQueue<RowMap> queue;
	private Thread thread;
	private StoppableTaskState taskState;
	private String deadLetterTopic;
	private final ConcurrentLinkedQueue<Pair<ProducerRecord<String,String>, KafkaCallback>> deadLetterQueue;

	public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {
		if ( partitionKey.equals("table") ) {
			return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");
		} else {
			return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);
		}
	}

	public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue,
		Producer<String,String> producer)
	{
		super(context);

		if ( kafkaTopic == null ) {
			this.topic = "maxwell";
		} else {
			this.topic = kafkaTopic;
		}

		this.interpolateTopic = this.topic.contains("%{");
		this.kafka = producer;

		String hash = context.getConfig().kafkaPartitionHash;
		String partitionKey = context.getConfig().producerPartitionKey;
		String partitionColumns = context.getConfig().producerPartitionColumns;
		String partitionFallback = context.getConfig().producerPartitionFallback;
		this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);

		this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);
		this.ddlTopic =  context.getConfig().ddlKafkaTopic;
		this.deadLetterTopic = context.getConfig().deadLetterTopic;
		this.deadLetterQueue = new ConcurrentLinkedQueue<>();

		if ( context.getConfig().kafkaKeyFormat.equals("hash") )
			keyFormat = KeyFormat.HASH;
		else
			keyFormat = KeyFormat.ARRAY;

		this.queue = queue;
		this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
	}

	public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,
		ArrayBlockingQueue<RowMap> queue)
	{
		this(context, kafkaTopic, queue,
			new KafkaProducer<String,String>(kafkaProperties, new StringSerializer(), new StringSerializer()));
	}

	@Override
	public void run() {
		this.thread = Thread.currentThread();
		while ( true ) {
			try {
				drainDeadLetterQueue();
				RowMap row = queue.take();
				if (!taskState.isRunning()) {
					taskState.stopped();
					return;
				}
				this.push(row);
			} catch ( Exception e ) {
				taskState.stopped();
				context.terminate(e);
				return;
			}
		}
	}

	void drainDeadLetterQueue() {
		Pair<ProducerRecord<String, String>, KafkaCallback> pair;
		while ((pair = deadLetterQueue.poll()) != null) {
			sendAsync(pair.getLeft(), pair.getRight());
		}
	}

	//......

}
复制代码
  • MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

AbstractAsyncProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java

public abstract class AbstractAsyncProducer extends AbstractProducer {

	public class CallbackCompleter {
		private InflightMessageList inflightMessages;
		private final MaxwellContext context;
		private final MaxwellConfig config;
		private final Position position;
		private final boolean isTXCommit;
		private final long messageID;

		public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
			this.inflightMessages = inflightMessages;
			this.context = context;
			this.config = context.getConfig();
			this.position = position;
			this.isTXCommit = isTXCommit;
			this.messageID = messageID;
		}

		public void markCompleted() {
			inflightMessages.freeSlot(messageID);
			if(isTXCommit) {
				InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

				if (message != null) {
					context.setPosition(message.position);
					long currentTime = System.currentTimeMillis();
					long age = currentTime - message.sendTimeMS;

					messagePublishTimer.update(age, TimeUnit.MILLISECONDS);
					messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);

					if (age > config.metricsAgeSlo) {
						messageLatencySloViolationCount.inc();
					}
				}
			}
		}
	}

	private InflightMessageList inflightMessages;

	public AbstractAsyncProducer(MaxwellContext context) {
		super(context);

		this.inflightMessages = new InflightMessageList(context);

		Metrics metrics = context.getMetrics();
		String gaugeName = metrics.metricName("inflightmessages", "count");
		metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size());
	}

	public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;

	@Override
	public final void push(RowMap r) throws Exception {
		Position position = r.getNextPosition();
		// Rows that do not get sent to a target will be automatically marked as complete.
		// We will attempt to commit a checkpoint up to the current row.
		if(!r.shouldOutput(outputConfig)) {
			if ( position != null ) {
				inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);

				InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);
				if (completed != null) {
					context.setPosition(completed.position);
				}
			}
			return;
		}

		// back-pressure from slow producers

		long messageID = inflightMessages.waitForSlot();

		if(r.isTXCommit()) {
			inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
		}

		CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

		sendAsync(r, cc);
	}
}
复制代码
  • AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync

小结

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

原文  https://juejin.im/post/5eae2ad2e51d452e70669419
正文到此结束
Loading...