mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java
public class BinaryLogFileReader implements Closeable { public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e}; private final ByteArrayInputStream inputStream; private final EventDeserializer eventDeserializer; public BinaryLogFileReader(File file) throws IOException { this(file, new EventDeserializer()); } public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException { this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer); } public BinaryLogFileReader(InputStream inputStream) throws IOException { this(inputStream, new EventDeserializer()); } public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException { if (inputStream == null) { throw new IllegalArgumentException("Input stream cannot be NULL"); } if (eventDeserializer == null) { throw new IllegalArgumentException("Event deserializer cannot be NULL"); } this.inputStream = new ByteArrayInputStream(inputStream); try { byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length); if (!Arrays.equals(magicHeader, MAGIC_HEADER)) { throw new IOException("Not a valid binary log"); } } catch (IOException e) { try { this.inputStream.close(); } catch (IOException ex) { // ignore } throw e; } this.eventDeserializer = eventDeserializer; } /** * @return deserialized event or null in case of end-of-stream */ public Event readEvent() throws IOException { return eventDeserializer.nextEvent(inputStream); } @Override public void close() throws IOException { inputStream.close(); } } 复制代码
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java
public class EventDeserializer { private final EventHeaderDeserializer eventHeaderDeserializer; private final EventDataDeserializer defaultEventDataDeserializer; private final Map<EventType, EventDataDeserializer> eventDataDeserializers; private EnumSet<CompatibilityMode> compatibilitySet = EnumSet.noneOf(CompatibilityMode.class); private int checksumLength; private final Map<Long, TableMapEventData> tableMapEventByTableId; private EventDataDeserializer tableMapEventDataDeserializer; private EventDataDeserializer formatDescEventDataDeserializer; //...... public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { if (inputStream.peek() == -1) { return null; } EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream); EventData eventData; switch (eventHeader.getEventType()) { case FORMAT_DESCRIPTION: eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader); break; case TABLE_MAP: eventData = deserializeTableMapEventData(inputStream, eventHeader); break; default: EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType()); eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer); } return new Event(eventHeader, eventData); } //...... } 复制代码
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java
public interface EventHeader extends Serializable { long getTimestamp(); EventType getEventType(); long getServerId(); long getHeaderLength(); long getDataLength(); } 复制代码
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/EventData.java
public interface EventData extends Serializable { } 复制代码