public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator { //...... private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception { BinlogConnectorEvent event; RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage); String currentQuery = null; while ( true ) { event = pollEvent(); if (event == null) { ensureReplicatorThread(); continue; } EventType eventType = event.getEvent().getHeader().getEventType(); if (event.isCommitEvent()) { if (!buffer.isEmpty()) { buffer.getLast().setTXCommit(); long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp(); transactionExecutionTime.update(timeSpent); transactionRowCount.update(buffer.size()); } if(eventType == EventType.XID) { buffer.setXid(event.xidData().getXid()); } return buffer; } switch(eventType) { case WRITE_ROWS: case UPDATE_ROWS: case DELETE_ROWS: case EXT_WRITE_ROWS: case EXT_UPDATE_ROWS: case EXT_DELETE_ROWS: Table table = tableCache.getTable(event.getTableID()); if ( table != null && shouldOutputEvent(table.getDatabase(), table.getName(), filter, table.getColumnNames()) ) { for ( RowMap r : event.jsonMaps(table, getLastHeartbeatRead(), currentQuery) ) if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) { buffer.add(r); } } currentQuery = null; break; case TABLE_MAP: TableMapEventData data = event.tableMapData(); tableCache.processEvent(getSchema(), this.filter, data.getTableId(), data.getDatabase(), data.getTable()); break; case ROWS_QUERY: RowsQueryEventData rqed = event.getEvent().getData(); currentQuery = rqed.getQuery(); break; case QUERY: QueryEventData qe = event.queryData(); String sql = qe.getSql(); String upperCaseSql = sql.toUpperCase(); if ( upperCaseSql.startsWith(BinlogConnectorEvent.SAVEPOINT)) { LOGGER.debug("Ignoring SAVEPOINT in transaction: " + qe); } else if ( createTablePattern.matcher(sql).find() ) { // CREATE TABLE `foo` SELECT * FROM `bar` will put a CREATE TABLE // inside a transaction. Note that this could, in rare cases, lead // to us starting on a WRITE_ROWS event -- we sync the schema position somewhere // kinda unsafe. processQueryEvent(event); } else if (upperCaseSql.startsWith("INSERT INTO MYSQL.RDS_") || upperCaseSql.startsWith("DELETE FROM MYSQL.RDS_")) { // RDS heartbeat events take the following form: // INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1483041015005) ON DUPLICATE KEY UPDATE value = 1483041015005 // Other RDS internal events like below: // INSERT INTO mysql.rds_sysinfo(name, value) values ('innodb_txn_key','Thu Nov 15 10:30:07 UTC 2018') // DELETE FROM mysql.rds_sysinfo where name = 'innodb_txn_key' // We don't need to process them, just ignore } else if (upperCaseSql.startsWith("DROP TEMPORARY TABLE")) { // Ignore temporary table drop statements inside transactions } else { LOGGER.warn("Unhandled QueryEvent @ {} inside transaction: {}", event.getPosition().fullPosition(), qe); } break; } } } //...... }
public class ListWithDiskBuffer<T> { static final Logger LOGGER = LoggerFactory.getLogger(ListWithDiskBuffer.class); private final long maxInMemoryElements; private final LinkedList<T> list; private long elementsInFile = 0; private File file; private ObjectInputStream is; private ObjectOutputStream os; public ListWithDiskBuffer(long maxInMemoryElements) { this.maxInMemoryElements = maxInMemoryElements; list = new LinkedList<>(); } public void add(T element) throws IOException { list.add(element); while ( shouldBuffer() ) evict(); } protected boolean shouldBuffer() { return this.list.size() > maxInMemoryElements; } protected void resetOutputStreamCaches() throws IOException { os.reset(); } public void flushToDisk() throws IOException { if ( os != null ) os.flush(); } public boolean isEmpty() { return this.size() == 0; } public T getLast() { return list.getLast(); } public T removeFirst(Class<T> clazz) throws IOException, ClassNotFoundException { if ( elementsInFile > 0 ) { if ( is == null ) { os.flush(); is = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file))); } Object object = is.readObject(); T element = clazz.cast(object); elementsInFile--; return element; } else { return list.removeFirst(); } } public Long size() { return list.size() + elementsInFile; } public Long inMemorySize() { return Long.valueOf(list.size()); } @Override protected void finalize() throws Throwable { try { if ( file != null ) file.delete(); } finally { super.finalize(); } } protected T evict() throws IOException { if ( file == null ) { file = File.createTempFile("maxwell", "events"); file.deleteOnExit(); os = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file))); } if ( elementsInFile == 0 ) { LOGGER.info("Overflowed in-memory buffer, spilling over into " + file); } T evicted = this.list.removeFirst(); os.writeObject(evicted); elementsInFile++; if ( elementsInFile % maxInMemoryElements == 0 ) resetOutputStreamCaches(); return evicted; } }
public class RowMapBuffer extends ListWithDiskBuffer<RowMap> { private static long FlushOutputStreamBytes = 10000000; private Long xid; private Long xoffset = 0L; private Long serverId; private Long threadId; private Long schemaId; private long memorySize = 0; private long outputStreamCacheSize = 0; private final long maxMemory; public RowMapBuffer(long maxInMemoryElements) { super(maxInMemoryElements); this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * 0.25); } public RowMapBuffer(long maxInMemoryElements, long maxMemory) { super(maxInMemoryElements); this.maxMemory = maxMemory; } public RowMapBuffer(long maxInMemoryElements, float bufferMemoryUsage) { super(maxInMemoryElements); this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * bufferMemoryUsage); } @Override public void add(RowMap rowMap) throws IOException { this.memorySize += rowMap.getApproximateSize(); super.add(rowMap); } @Override protected boolean shouldBuffer() { return memorySize > maxMemory; } @Override protected RowMap evict() throws IOException { RowMap r = super.evict(); this.memorySize -= r.getApproximateSize(); /* For performance reasons, the output stream will hold on to cached objects. * There's probably a smarter thing to do (write our own serdes, maybe?), but * for now we forcibly flush its cache when it gets too big. */ this.outputStreamCacheSize += r.getApproximateSize(); if ( this.outputStreamCacheSize > FlushOutputStreamBytes ) { resetOutputStreamCaches(); this.outputStreamCacheSize = 0; } return r; } public RowMap removeFirst() throws IOException, ClassNotFoundException { RowMap r = super.removeFirst(RowMap.class); r.setXid(this.xid); r.setXoffset(this.xoffset++); r.setServerId(this.serverId); r.setThreadId(this.threadId); r.setSchemaId(this.schemaId); return r; } public void setXid(Long xid) { this.xid = xid; } public void setServerId(Long serverId) { this.serverId = serverId; } public void setThreadId(Long threadId) { this.threadId = threadId; } public void setSchemaId(Long schemaId) { this.schemaId = schemaId; } }
public class TableCache { private final String maxwellDB; public TableCache(String maxwellDB) { this.maxwellDB = maxwellDB; } private final HashMap<Long, Table> tableMapCache = new HashMap<>(); public void processEvent(Schema schema, Filter filter, Long tableId, String dbName, String tblName) { if ( !tableMapCache.containsKey(tableId)) { if ( filter.isTableBlacklisted(dbName, tblName) ) { return; } Database db = schema.findDatabase(dbName); if ( db == null ) throw new RuntimeException("Couldn't find database " + dbName); else { Table tbl = db.findTable(tblName); if (tbl == null) throw new RuntimeException("Couldn't find table " + tblName + " in database " + dbName); else tableMapCache.put(tableId, tbl); } } } public Table getTable(Long tableId) { return tableMapCache.get(tableId); } public void clear() { tableMapCache.clear(); } }