转载

Kafka -- Java消费者管理TCP连接

  1. 消费者会为每个要消费的分区创建与该分区 领导者副本 所在Broker的Socket连接
  2. 假设消费者要消费5个分区的数据,这5个分区各自的领导者副本分布在4台Broker上
    • 那么消费者在消费时会创建与这4台Broker的Socket连接

TCP连接数

日志详解

[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 ( id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者程序创建的 第一个TCP连接 ,该Socket用于发送 FindCoordinator 请求

此时消费者对要连接的Kafka集群 一无所知 ,因此它连接的Broker节点的ID为 -1 ,表示不知道要连接的Broker的任何信息

[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=’t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)

消费者 复用 刚刚创建的Socket连接,向Kafka集群发送 元数据请求 以获取 整个集群的信息

[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)

消费者程序开始发送 FindCoordinator 请求给第一步中连接的Broker,即 localhost:9092 (nodeId为 -1

[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094 } (org.apache.kafka.clients.NetworkClient:837)

十几毫秒后,消费者程序成功地获悉 Coordinator所在的Broker ,即 node_id=2,host=localhost,port=9094

[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 ( id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者此时已经知道 协调者Broker的连接信息 了,发起第二个Socket连接,创建连向 localhost:9094 的TCP连接

只有连接了Coordinator,消费者才能正常地开启 消费组的各种功能 以及 后续的消息消费

此时的id是由 Integer.MAX_VALUE 减去 Coordinator所在的Broker的Id 计算出来的,即 2147483647 - 2 = 2147483645

这种节点ID的标记方式是Kafka社区 特意为之 ,目的是要让 组协调请求真正的数据获取请求 使用 不同的Socket连接

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 ( id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 ( id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 ( id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者又分别创建了 新的TCP连接 ,主要用于 实际的消息获取

3类TCP连接

  1. 确定协调者获取集群元数据
  2. 连接协调者 ,令其执行组成员管理操作
  3. 执行 实际的消息获取
原文  http://zhongmingmao.me/2019/09/10/kafka-consumer-manage-tcp-connection/
正文到此结束
Loading...