本文主要研究一下MaxwellKafkaPartitioner
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/MaxwellKafkaPartitioner.java
public class MaxwellKafkaPartitioner extends AbstractMaxwellPartitioner { HashFunction hashFunc; public MaxwellKafkaPartitioner(String hashFunction, String partitionKey, String csvPartitionColumns, String partitionKeyFallback) { super(partitionKey, csvPartitionColumns, partitionKeyFallback); int MURMUR_HASH_SEED = 25342; switch (hashFunction) { case "murmur3": this.hashFunc = new HashFunctionMurmur3(MURMUR_HASH_SEED); break; case "default": default: this.hashFunc = new HashFunctionDefault(); break; } } public int kafkaPartition(RowMap r, int numPartitions) { return Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions); } }
Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions)
计算partition maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java
public abstract class AbstractMaxwellPartitioner { List<String> partitionColumns = new ArrayList<String>(); private final PartitionBy partitionBy, partitionByFallback; private PartitionBy partitionByForString(String key) { if ( key == null ) return PartitionBy.DATABASE; switch(key) { case "table": return PartitionBy.TABLE; case "database": return PartitionBy.DATABASE; case "primary_key": return PartitionBy.PRIMARY_KEY; case "transaction_id": return PartitionBy.TRANSACTION_ID; case "column": return PartitionBy.COLUMN; case "random": return PartitionBy.RANDOM; default: throw new RuntimeException("Unknown partitionBy string: " + key); } } public AbstractMaxwellPartitioner(String partitionKey, String csvPartitionColumns, String partitionKeyFallback) { this.partitionBy = partitionByForString(partitionKey); this.partitionByFallback = partitionByForString(partitionKeyFallback); if ( csvPartitionColumns != null ) this.partitionColumns = Arrays.asList(csvPartitionColumns.split("//s*,//s*")); } static protected String getDatabase(RowMap r) { return r.getDatabase(); } static protected String getTable(RowMap r) { return r.getTable(); } public String getHashString(RowMap r, PartitionBy by) { switch ( by ) { case TABLE: String t = r.getTable(); if ( t == null && partitionByFallback == PartitionBy.DATABASE ) return r.getDatabase(); else return t; case DATABASE: return r.getDatabase(); case PRIMARY_KEY: return r.getRowIdentity().toConcatString(); case TRANSACTION_ID: return String.valueOf(r.getXid()); case COLUMN: String s = r.buildPartitionKey(partitionColumns); if ( s.length() > 0 ) return s; else return getHashString(r, partitionByFallback); case RANDOM: return RandomStringUtils.random(10, true, true); } return null; // thx java } public String getHashString(RowMap r) { if ( r.getPartitionString() != null ) return r.getPartitionString(); else return getHashString(r, partitionBy); } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunction.java
public interface HashFunction { int hashCode(String s); }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunctionDefault.java
public class HashFunctionDefault implements HashFunction { public int hashCode(String s) { return s.hashCode(); } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunctionMurmur3.java
public class HashFunctionMurmur3 implements HashFunction { private int seed; public HashFunctionMurmur3(int seed){ this.seed = seed; } public int hashCode(String s) { return MurmurHash3.murmurhash3_x86_32(s, 0, s.length(), seed); } }
MurmurHash3.murmurhash3_x86_32(s, 0, s.length(), seed)
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java
class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class); private final Producer<String, String> kafka; private final String topic; private final String ddlTopic; private final MaxwellKafkaPartitioner partitioner; private final MaxwellKafkaPartitioner ddlPartitioner; //...... ProducerRecord<String, String> makeProducerRecord(final RowMap r) throws Exception { RowIdentity pk = r.getRowIdentity(); String key = r.pkToJson(keyFormat); String value = r.toJSON(outputConfig); ProducerRecord<String, String> record; if (r instanceof DDLMap) { record = new ProducerRecord<>(this.ddlTopic, this.ddlPartitioner.kafkaPartition(r, getNumPartitions(this.ddlTopic)), key, value); } else { String topic; // javascript topic override topic = r.getKafkaTopic(); if ( topic == null ) { topic = generateTopic(this.topic, pk); } LOGGER.debug("context.getConfig().producerPartitionKey = " + context.getConfig().producerPartitionKey); record = new ProducerRecord<>(topic, this.partitioner.kafkaPartition(r, getNumPartitions(topic)), key, value); } return record; } //...... }
MaxwellKafkaPartitioner继承了AbstractMaxwellPartitioner,其构造器根据hashFunction类型创建HashFunctionMurmur3或者HashFunctionDefault;其kafkaPartition方法则通过 Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions)
计算partition