本文主要研究一下maxwell的FileProducer
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/FileProducer.java
public class FileProducer extends AbstractProducer { private final File file; private final FileWriter fileWriter; public FileProducer(MaxwellContext context, String filename) throws IOException { super(context); this.file = new File(filename); this.fileWriter = new FileWriter(this.file, true); } @Override public void push(RowMap r) throws Exception { String output = r.toJSON(outputConfig); if ( output != null ) { this.fileWriter.write(r.toJSON(outputConfig)); this.fileWriter.write('/n'); this.fileWriter.flush(); } context.setPosition(r); } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/row/RowMap.java
public class RowMap implements Serializable { //...... public String toJSON(MaxwellOutputConfig outputConfig) throws Exception { MaxwellJson json = MaxwellJson.getInstance(); JsonGenerator g = json.reset(); g.writeStartObject(); // start of row { g.writeStringField(FieldNames.DATABASE, this.database); g.writeStringField(FieldNames.TABLE, this.table); if ( outputConfig.includesRowQuery && this.rowQuery != null) { g.writeStringField(FieldNames.QUERY, this.rowQuery); } g.writeStringField(FieldNames.TYPE, this.rowType); g.writeNumberField(FieldNames.TIMESTAMP, this.timestampSeconds); if ( outputConfig.includesCommitInfo ) { if ( this.xid != null ) g.writeNumberField(FieldNames.TRANSACTION_ID, this.xid); if ( outputConfig.includesXOffset && this.xoffset != null && !this.txCommit ) g.writeNumberField(FieldNames.TRANSACTION_OFFSET, this.xoffset); if ( this.txCommit ) g.writeBooleanField(FieldNames.COMMIT, true); } if ( this.position != null ) { BinlogPosition binlogPosition = this.position.getBinlogPosition(); if ( outputConfig.includesBinlogPosition ) g.writeStringField(FieldNames.POSITION, binlogPosition.getFile() + ":" + binlogPosition.getOffset()); if ( outputConfig.includesGtidPosition) g.writeStringField(FieldNames.GTID, binlogPosition.getGtid()); } if ( outputConfig.includesServerId && this.serverId != null ) { g.writeNumberField(FieldNames.SERVER_ID, this.serverId); } if ( outputConfig.includesThreadId && this.threadId != null ) { g.writeNumberField(FieldNames.THREAD_ID, this.threadId); } if ( outputConfig.includesSchemaId && this.schemaId != null ) { g.writeNumberField(FieldNames.SCHEMA_ID, this.schemaId); } if ( this.comment != null ) { g.writeStringField(FieldNames.COMMENT, this.comment); } for ( Map.Entry<String, Object> entry : this.extraAttributes.entrySet() ) { g.writeObjectField(entry.getKey(), entry.getValue()); } EncryptionContext encryptionContext = null; if (outputConfig.encryptionEnabled()) { encryptionContext = EncryptionContext.create(outputConfig.secretKey); } DataJsonGenerator dataWriter = outputConfig.encryptionMode == EncryptionMode.ENCRYPT_DATA ? json.getEncryptingGenerator() : json.getPlaintextGenerator(); JsonGenerator dataGenerator = dataWriter.begin(); if ( outputConfig.includesPrimaryKeys ) { List<Object> pkValues = new ArrayList<>(); pkColumns.forEach(pkColumn -> pkValues.add(this.data.get(pkColumn))); MaxwellJson.writeValueToJSON(g, outputConfig.includesNulls, FieldNames.PRIMARY_KEY, pkValues); } if ( outputConfig.includesPrimaryKeyColumns ) { MaxwellJson.writeValueToJSON(g, outputConfig.includesNulls, FieldNames.PRIMARY_KEY_COLUMNS, pkColumns); } if ( outputConfig.excludeColumns.size() > 0 ) { // NOTE: to avoid concurrent modification. Set<String> keys = new HashSet<>(); keys.addAll(this.data.keySet()); keys.addAll(this.oldData.keySet()); for ( Pattern p : outputConfig.excludeColumns ) { for ( String key : keys ) { if ( p.matcher(key).matches() ) { this.data.remove(key); this.oldData.remove(key); } } } } writeMapToJSON(FieldNames.DATA, this.data, dataGenerator, outputConfig.includesNulls); if( !this.oldData.isEmpty() ){ writeMapToJSON(FieldNames.OLD, this.oldData, dataGenerator, outputConfig.includesNulls); } dataWriter.end(encryptionContext); g.writeEndObject(); // end of row if(outputConfig.encryptionMode == EncryptionMode.ENCRYPT_ALL){ String plaintext = json.consume(); json.getEncryptingGenerator().writeEncryptedObject(plaintext, encryptionContext); } return json.consume(); } //...... }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellOutputConfig.java
public class MaxwellOutputConfig { public boolean includesBinlogPosition; public boolean includesGtidPosition; public boolean includesCommitInfo; public boolean includesXOffset; public boolean includesNulls; public boolean includesServerId; public boolean includesThreadId; public boolean includesSchemaId; public boolean includesRowQuery; public boolean includesPrimaryKeys; public boolean includesPrimaryKeyColumns; public boolean outputDDL; public List<Pattern> excludeColumns; public EncryptionMode encryptionMode; public String secretKey; public boolean zeroDatesAsNull; public MaxwellOutputConfig() { this.includesBinlogPosition = false; this.includesGtidPosition = false; this.includesCommitInfo = true; this.includesNulls = true; this.includesServerId = false; this.includesThreadId = false; this.includesSchemaId = false; this.includesRowQuery = false; this.includesPrimaryKeys = false; this.includesPrimaryKeyColumns = false; this.outputDDL = false; this.zeroDatesAsNull = false; this.excludeColumns = new ArrayList<>(); this.encryptionMode = EncryptionMode.ENCRYPT_NONE; this.secretKey = null; } public boolean encryptionEnabled() { return encryptionMode != EncryptionMode.ENCRYPT_NONE; } }
FileProducer继承了AbstractProducer,其构造器接收filename参数,然后创建FileWriter;其push方法写入json,然后执行context.setPosition(r);RowMap的toJSON方法根据MaxwellOutputConfig来对RowMap进行json化处理;MaxwellOutputConfig定义了includesBinlogPosition、includesGtidPosition、includesCommitInfo等属性