本文主要研究一下maxwell的Recovery
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/recovery/Recovery.java
public class Recovery { static final Logger LOGGER = LoggerFactory.getLogger(Recovery.class); private final ConnectionPool replicationConnectionPool; private final RecoveryInfo recoveryInfo; private final MaxwellMysqlConfig replicationConfig; private final String maxwellDatabaseName; private final RecoverySchemaStore schemaStore; public Recovery(MaxwellMysqlConfig replicationConfig, String maxwellDatabaseName, ConnectionPool replicationConnectionPool, CaseSensitivity caseSensitivity, RecoveryInfo recoveryInfo) { this.replicationConfig = replicationConfig; this.replicationConnectionPool = replicationConnectionPool; this.recoveryInfo = recoveryInfo; this.schemaStore = new RecoverySchemaStore(replicationConnectionPool, maxwellDatabaseName, caseSensitivity); this.maxwellDatabaseName = maxwellDatabaseName; } public HeartbeatRowMap recover() throws Exception { String recoveryMsg = String.format( "old-server-id: %d, position: %s", recoveryInfo.serverID, recoveryInfo.position ); LOGGER.warn("attempting to recover from master-change: " + recoveryMsg); List<BinlogPosition> list = getBinlogInfo(); for ( int i = list.size() - 1; i >= 0 ; i-- ) { BinlogPosition binlogPosition = list.get(i); Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat()); Metrics metrics = new NoOpMetrics(); LOGGER.debug("scanning binlog: " + binlogPosition); Replicator replicator = new BinlogConnectorReplicator( this.schemaStore, null, null, replicationConfig, 0L, // server-id of 0 activates "mysqlbinlog" behavior where the server will stop after each binlog maxwellDatabaseName, metrics, position, true, recoveryInfo.clientID, new HeartbeatNotifier(), null, new RecoveryFilter(this.maxwellDatabaseName), new MaxwellOutputConfig(), 0.25f // Default memory usage size, not used ); HeartbeatRowMap h = findHeartbeat(replicator); if ( h != null ) { LOGGER.warn("recovered new master position: " + h.getNextPosition()); return h; } } LOGGER.error("Could not recover from master-change: " + recoveryMsg); return null; } /** * try to find a given heartbeat value from the replicator. * @return A BinlogPosition where the heartbeat was found, or null if none was found. */ private HeartbeatRowMap findHeartbeat(Replicator r) throws Exception { r.startReplicator(); for (RowMap row = r.getRow(); row != null ; row = r.getRow()) { if (!(row instanceof HeartbeatRowMap)) { continue; } HeartbeatRowMap heartbeatRow = (HeartbeatRowMap) row; if (heartbeatRow.getPosition().getLastHeartbeatRead() == recoveryInfo.getHeartbeat()) return heartbeatRow; } return null; } /** * fetch a list of binlog positions representing the start of each binlog file * * @return a list of binlog positions to attempt recovery at * */ private List<BinlogPosition> getBinlogInfo() throws SQLException { ArrayList<BinlogPosition> list = new ArrayList<>(); try ( Connection c = replicationConnectionPool.getConnection() ) { ResultSet rs = c.createStatement().executeQuery("SHOW BINARY LOGS"); while ( rs.next() ) { list.add(BinlogPosition.at(4, rs.getString("Log_name"))); } } return list; } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/Maxwell.java
public class Maxwell implements Runnable { protected MaxwellConfig config; protected MaxwellContext context; protected Replicator replicator; static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class); public Maxwell(MaxwellConfig config) throws SQLException, URISyntaxException { this(new MaxwellContext(config)); } protected Maxwell(MaxwellContext context) throws SQLException, URISyntaxException { this.config = context.getConfig(); this.context = context; this.context.probeConnections(); } //...... private Position attemptMasterRecovery() throws Exception { HeartbeatRowMap recoveredHeartbeat = null; MysqlPositionStore positionStore = this.context.getPositionStore(); RecoveryInfo recoveryInfo = positionStore.getRecoveryInfo(config); if ( recoveryInfo != null ) { Recovery masterRecovery = new Recovery( config.replicationMysql, config.databaseName, this.context.getReplicationConnectionPool(), this.context.getCaseSensitivity(), recoveryInfo ); recoveredHeartbeat = masterRecovery.recover(); if (recoveredHeartbeat != null) { // load up the schema from the recovery position and chain it into the // new server_id MysqlSchemaStore oldServerSchemaStore = new MysqlSchemaStore( context.getMaxwellConnectionPool(), context.getReplicationConnectionPool(), context.getSchemaConnectionPool(), recoveryInfo.serverID, recoveryInfo.position, context.getCaseSensitivity(), config.filter, false ); // Note we associate this schema to the start position of the heartbeat event, so that // we pick it up when resuming at the event after the heartbeat. oldServerSchemaStore.clone(context.getServerID(), recoveredHeartbeat.getPosition()); return recoveredHeartbeat.getNextPosition(); } } return null; } //...... }
Recovery提供了recover方法,它先通过getBinlogInfo方法获取BinlogPosition列表,之后从后往前遍历BinlogPosition构建BinlogConnectorReplicator,然后最后通过findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()为recoveryInfo.getHeartbeat()的HeartbeatRowMap,如果不为null则直接返回