本文主要研究一下SimpleCanalConnector的getWithoutAck
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
public class SimpleCanalConnector implements CanalConnector { private static final Logger logger = LoggerFactory.getLogger(SimpleCanalConnector.class); private SocketAddress address; private String username; private String password; private int soTimeout = 60000; // milliseconds private int idleTimeout = 60 * 60 * 1000; // client和server之间的空闲链接超时的时间,默认为1小时 private String filter; // 记录上一次的filter提交值,便于自动重试时提交 private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN); private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN); private SocketChannel channel; private ReadableByteChannel readableChannel; private WritableByteChannel writableChannel; private List<Compression> supportedCompressions = new ArrayList<Compression>(); private ClientIdentity clientIdentity; private ClientRunningMonitor runningMonitor; // 运行控制 private ZkClientx zkClientx; private BooleanMutex mutex = new BooleanMutex(false); private volatile boolean connected = false; // 代表connected是否已正常执行,因为有HA,不代表在工作中 private boolean rollbackOnConnect = true; // 是否在connect链接成功后,自动执行rollback操作 private boolean rollbackOnDisConnect = false; // 是否在connect链接成功后,自动执行rollback操作 private boolean lazyParseEntry = false; // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析 // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败 private Object readDataLock = new Object(); private Object writeDataLock = new Object(); private volatile boolean running = false; //...... public Message getWithoutAck(int batchSize) throws CanalClientException { return getWithoutAck(batchSize, null, null); } public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); if (!running) { return null; } try { int size = (batchSize <= 0) ? 1000 : batchSize; long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制 if (unit == null) { unit = TimeUnit.MILLISECONDS; } writeWithHeader(Packet.newBuilder() .setType(PacketType.GET) .setBody(Get.newBuilder() .setAutoAck(false) .setDestination(clientIdentity.getDestination()) .setClientId(String.valueOf(clientIdentity.getClientId())) .setFetchSize(size) .setTimeout(time) .setUnit(unit.ordinal()) .build() .toByteString()) .build() .toByteArray()); return receiveMessages(); } catch (IOException e) { throw new CanalClientException(e); } } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
public class SimpleCanalConnector implements CanalConnector { //...... private void writeWithHeader(byte[] body) throws IOException { writeWithHeader(writableChannel, body); } private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException { synchronized (writeDataLock) { writeHeader.clear(); writeHeader.putInt(body.length); writeHeader.flip(); channel.write(writeHeader); channel.write(ByteBuffer.wrap(body)); } } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
public class SimpleCanalConnector implements CanalConnector { //...... private Message receiveMessages() throws IOException { byte[] data = readNextPacket(); return CanalMessageDeserializer.deserializer(data, lazyParseEntry); } private byte[] readNextPacket() throws IOException { return readNextPacket(readableChannel); } private byte[] readNextPacket(ReadableByteChannel channel) throws IOException { synchronized (readDataLock) { readHeader.clear(); read(channel, readHeader); int bodyLen = readHeader.getInt(0); ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN); read(channel, bodyBuf); return bodyBuf.array(); } } //...... }
getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit;receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body