本文主要研究一下BinaryLogClient的connect
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean { //...... public void connect(final long timeout) throws IOException, TimeoutException { final CountDownLatch countDownLatch = new CountDownLatch(1); AbstractLifecycleListener connectListener = new AbstractLifecycleListener() { @Override public void onConnect(BinaryLogClient client) { countDownLatch.countDown(); } }; registerLifecycleListener(connectListener); final AtomicReference<IOException> exceptionReference = new AtomicReference<IOException>(); Runnable runnable = new Runnable() { @Override public void run() { try { setConnectTimeout(timeout); connect(); } catch (IOException e) { exceptionReference.set(e); countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout" } } }; newNamedThread(runnable, "blc-" + hostname + ":" + port).start(); boolean started = false; try { started = countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, e.getMessage()); } } unregisterLifecycleListener(connectListener); if (exceptionReference.get() != null) { throw exceptionReference.get(); } if (!started) { try { terminateConnect(); } finally { throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms"); } } } //...... }
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean { //...... public void connect() throws IOException { if (!connectLock.tryLock()) { throw new IllegalStateException("BinaryLogClient is already connected"); } boolean notifyWhenDisconnected = false; try { Callable cancelDisconnect = null; try { try { long start = System.currentTimeMillis(); channel = openChannel(); if (connectTimeout > 0 && !isKeepAliveThreadRunning()) { cancelDisconnect = scheduleDisconnectIn(connectTimeout - (System.currentTimeMillis() - start)); } if (channel.getInputStream().peek() == -1) { throw new EOFException(); } } catch (IOException e) { throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + ". Please make sure it's running.", e); } GreetingPacket greetingPacket = receiveGreeting(); authenticate(greetingPacket); connectionId = greetingPacket.getThreadId(); if ("".equals(binlogFilename)) { synchronized (gtidSetAccessLock) { if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { gtidSet = new GtidSet(fetchGtidPurged()); } } } if (binlogFilename == null) { fetchBinlogFilenameAndPosition(); } if (binlogPosition < 4) { if (logger.isLoggable(Level.WARNING)) { logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4); } binlogPosition = 4; } ChecksumType checksumType = fetchBinlogChecksum(); if (checksumType != ChecksumType.NONE) { confirmSupportOfChecksum(checksumType); } if (heartbeatInterval > 0) { enableHeartbeat(); } gtid = null; tx = false; requestBinaryLogStream(); } catch (IOException e) { disconnectChannel(); throw e; } finally { if (cancelDisconnect != null) { try { cancelDisconnect.call(); } catch (Exception e) { if (logger.isLoggable(Level.WARNING)) { logger.warning("/"" + e.getMessage() + "/" was thrown while canceling scheduled disconnect call"); } } } } connected = true; notifyWhenDisconnected = true; if (logger.isLoggable(Level.INFO)) { String position; synchronized (gtidSetAccessLock) { position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition; } logger.info("Connected to " + hostname + ":" + port + " at " + position + " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")"); } for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onConnect(this); } if (keepAlive && !isKeepAliveThreadRunning()) { spawnKeepAliveThread(); } ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); synchronized (gtidSetAccessLock) { if (gtidSet != null) { ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); } } listenForEventPackets(); } finally { connectLock.unlock(); if (notifyWhenDisconnected) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onDisconnect(this); } } } } //...... }
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean { //...... private Callable scheduleDisconnectIn(final long timeout) { final BinaryLogClient self = this; final CountDownLatch connectLatch = new CountDownLatch(1); final Thread thread = newNamedThread(new Runnable() { @Override public void run() { try { connectLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, e.getMessage()); } } if (connectLatch.getCount() != 0) { if (logger.isLoggable(Level.WARNING)) { logger.warning("Failed to establish connection in " + timeout + "ms. " + "Forcing disconnect."); } try { self.disconnectChannel(); } catch (IOException e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, e.getMessage()); } } } } }, "blc-disconnect-" + hostname + ":" + port); thread.start(); return new Callable() { public Object call() throws Exception { connectLatch.countDown(); thread.join(); return null; } }; } private void disconnectChannel() throws IOException { connected = false; if (channel != null && channel.isOpen()) { channel.close(); } } //...... }
mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean { //...... private GreetingPacket receiveGreeting() throws IOException { byte[] initialHandshakePacket = channel.read(); if (initialHandshakePacket[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } return new GreetingPacket(initialHandshakePacket); } private void fetchBinlogFilenameAndPosition() throws IOException { ResultSetRowPacket[] resultSet; channel.write(new QueryCommand("show master status")); resultSet = readResultSet(); if (resultSet.length == 0) { throw new IOException("Failed to determine binlog filename/position"); } ResultSetRowPacket resultSetRow = resultSet[0]; binlogFilename = resultSetRow.getValue(0); binlogPosition = Long.parseLong(resultSetRow.getValue(1)); } private ChecksumType fetchBinlogChecksum() throws IOException { channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); ResultSetRowPacket[] resultSet = readResultSet(); if (resultSet.length == 0) { return ChecksumType.NONE; } return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); } private void enableHeartbeat() throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); if (statementResult[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } } private void requestBinaryLogStream() throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { if (gtidSet != null) { dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, useBinlogFilenamePositionInGtidMode ? binlogFilename : "", useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, gtidSet); } else { dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); } } channel.write(dumpBinaryLogCommand); } private void spawnKeepAliveThread() { final ExecutorService threadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port); } }); try { keepAliveThreadExecutorLock.lock(); threadExecutor.submit(new Runnable() { @Override public void run() { while (!threadExecutor.isShutdown()) { try { Thread.sleep(keepAliveInterval); } catch (InterruptedException e) { // expected in case of disconnect } if (threadExecutor.isShutdown()) { return; } boolean connectionLost = false; if (heartbeatInterval > 0) { connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval; } else { try { channel.write(new PingCommand()); } catch (IOException e) { connectionLost = true; } } if (connectionLost) { if (logger.isLoggable(Level.INFO)) { logger.info("Trying to restore lost connection to " + hostname + ":" + port); } try { terminateConnect(); connect(connectTimeout); } catch (Exception ce) { if (logger.isLoggable(Level.WARNING)) { logger.warning("Failed to restore connection to " + hostname + ":" + port + ". Next attempt in " + keepAliveInterval + "ms"); } } } } } }); keepAliveThreadExecutor = threadExecutor; } finally { keepAliveThreadExecutorLock.unlock(); } } //...... }
show master status
命令,然后解析binlogFilename及binlogPosition;fetchBinlogChecksum方法发送 show global variables like 'binlog_checksum'
命令;enableHeartbeat方法发送 set @master_heartbeat_period
命令;requestBinaryLogStream方法发送DumpBinaryLogGtidCommand命令;spawnKeepAliveThread方法会调度执行keepAliveThread,它主体是while循环,然后通过Thread.sleep(keepAliveInterval)来实现间隔,然后定时发送PingCommand命令,对于connectionLost的执行terminateConnect及connect方法 mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
public class BinaryLogClient implements BinaryLogClientMXBean { //...... private void listenForEventPackets() throws IOException { ByteArrayInputStream inputStream = channel.getInputStream(); boolean completeShutdown = false; try { while (inputStream.peek() != -1) { int packetLength = inputStream.readInteger(3); inputStream.skip(1); // 1 byte for sequence int marker = inputStream.read(); if (marker == 0xFF) { ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } if (marker == 0xFE && !blocking) { completeShutdown = true; break; } Event event; try { event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ? new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream); if (event == null) { throw new EOFException(); } } catch (Exception e) { Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e; if (cause instanceof EOFException || cause instanceof SocketException) { throw e; } if (isConnected()) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onEventDeserializationFailure(this, e); } } continue; } if (isConnected()) { eventLastSeen = System.currentTimeMillis(); updateGtidSet(event); notifyEventListeners(event); updateClientBinlogFilenameAndPosition(event); } } } catch (Exception e) { if (isConnected()) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onCommunicationFailure(this, e); } } } finally { if (isConnected()) { if (completeShutdown) { disconnect(); // initiate complete shutdown sequence (which includes keep alive thread) } else { disconnectChannel(); } } } } //...... }
connect方法先执行openChannel方法,之后在connectTimeout大于0且keepAliveThreadRunning为false时通过scheduleDisconnectIn调度执行超时逻辑;之后执行receiveGreeting获取greetingPacket,然后执行authenticate(greetingPacket),之后获取connectionId及设置gtidSet;在binlogFilename为null时执行fetchBinlogFilenameAndPosition;之后执行fetchBinlogChecksum,对于heartbeatInterval大于0的执行enableHeartbeat,最后执行requestBinaryLogStream;若出现IOException则执行disconnectChannel并抛出异常;连接成功之后回调lifecycleListeners的onConnect,对于需要keepAlive的执行spawnKeepAliveThread;最后执行listenForEventPackets