这是崔斯特的第一百一十三篇原创文章
努力、奋斗 (๑• . •๑)
应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(consumer group)的概念和特性。
当生产者向 Topic 写入消息的速度超过了消费者(consumer)的处理速度,导致大量的消息在 Kafka 中淤积,此时需要对消费者进行横向伸缩,用多个消费者从同一个主题读取消息,对消息进行分流。
Kafka 的消费者都属于消费者组(consumer group)。一个组中的 consumer 订阅同样的 topic,每个 consumer 接收 topic 一些分区(partition)中的消息。 同一个分区不能被一个组中的多个 consumer 消费 。
假设现在有一个 Topic 有4个分区,有一个消费者组订阅了这个 Topic,随着组中的消费者数量从1个增加到5个时,Topic 中分区被读取的情况:
如果组中 consumer 的数量超过分区数,多出的 consumer 会被闲置。因此,如果想提高消费者的并行处理能力,需要设置足够多的 partition 数量。
除了通过增加 consumer 来横向伸缩单个应用程序外,还会出现多个应用程序从同一个 Topic 读取数据的情况。这也是 Kafka 设计的主要目标之一:让 Topic 中的数据能够满足各种应用场景的需求。
如果要每个应用程序都可以获取到所有的消息,而不只是其中的一部分,只要保证每个应用程序有自己的 consumer group,就可以获取到 Topic 所有的消息:
横向伸缩 Kafka 消费者和消费者群组并不会对性能造成负面影响。
一个消费者组内的 consumer 共同读取 Topic 的分区。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡(rebalance)。再均衡非常重要,为消费者组带来了高可用性和伸缩性,可以放心的增加或移除消费者。
再均衡期间,消费者无法读取消息,造成整个 consumer group 一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,当前的读取状态会丢失。
消费者通过向作为组协调器(GroupCoordinator)的 broker(不同的组可以有不同的协调器)发送心跳来维持和群组以及分区的关系。心跳表明消费者在读取分区里的消息。消费者会在轮询消息或提交偏移量(offset)时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为消费者已经死亡,会触发一次再均衡。
在 Kafka 0.10.1 的版本中,对心跳行为进行了修改,由一个独立的线程负责心跳。
在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 非常相似,创建 KafkaConsumer 示例:
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092, broker2:9092"); // group.id,指定了消费者所属群组 props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
创建了消费者之后,需要订阅 Topic,subscribe() 方法接受一个主题列表作为参数:
// topic name is “customerCountries” consumer.subscribe(Collections.singletonList("customerCountries"));
subscribe() 也可以接收一个正则表达式,匹配多个主题(如果有新的名称匹配的主题创建,会立即触发一次再均衡,消费者就可以读取新添加的主题)。在 Kafka 和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题是很常见的做法。
// 订阅所有 test 前缀的 Topic: consumer.subscribe("test.*");
消息轮询是消费者的核心,通过轮询向服务器请求数据。消息轮询 API 会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要处理从分区返回的数据。消费者代码的主要部分如下所示:
try { while (true) { // 100 是超时时间(ms),在该时间内 poll 会等待服务器返回数据 ConsumerReccords<String, String> records = consumer.poll(100); // poll 返回一个记录列表。 // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。 for (ConsumerReccord<String, String> record : records) { log.debug("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } finally { // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡 consumer.close(); }
在第一次调用新消费者的 poll() 方法时,会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。心跳也是从轮询里发送出去的。
Kafka 与消费者相关的配置大部分参数都有合理的默认值,一般不需要修改,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。
指定消费者从服务器获取记录的最小字节数。服务器在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes,那么会等到有足够的可用数据时才返回给消费者。
合理的设置可以降低消费者和 broker 的工作负载,在 Topic 消息生产不活跃时,减少处理消息次数。如果没有很多可用数据,但消费者的 CPU 使用率却很高,需要调高该属性的值。如果消费者的数量比较多,调高该属性的值也可以降低 broker 的工作负载。
指定在 broker 中的等待时间,默认是500ms。如果没有足够的数据流入 Kafka,消费者获取的数据量的也没有达到 fetch.min.bytes,最终导致500ms的延迟。
如果要降低潜在的延迟(提高 SLA),可以调低该属性的值。fetch.max.wait.ms 和 fetch.min.bytes 有一个满足条件就会返回数据。
指定了服务器从每个分区里返回给消费者的最大字节数,默认值是1MB。也就是说 KafkaConsumer#poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。
如果一个主题有20个分区和5个消费者(同一个组内),那么每个消费者需要至少4MB 的可用内存(每个消费者读取4个分区)来接收记录。如果组内有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.parition.fetch.bytes 必须比 broker 能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息,导致消费者一直重试。
另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.parition.fetch.bytes 值改小或者延长会话过期时间。
指定了消费者与服务器断开连接的最大时间,默认是3s。如果消费者没有在指定的时间内发送心跳给 GroupCoordinator,就被认为已经死亡,会触发再均衡,把它的分区分配给其他消费者。
该属性与 heartbeat.interval.ms 紧密相关,heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 指定了消费者最长多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一,如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。
调低属性的值可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。调高属性的值,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
指定了消费者在读取一个没有偏移量(offset)的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理,默认值是 latest,表示在 offset 无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。
另一个值是 earliest,消费者将从起始位置读取分区的记录。
指定了消费者是否自动提交偏移量,默认值是 true,自动提交。
设为 false 可以程序自己控制何时提交偏移量。如果设为 true,需要通过配置 auto.commit.interval.ms 属性来控制提交的频率。
分区分配给组内消费者的策略,根据给定的消费者和 Topic,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略:
默认值是 org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,org.apache.kafka.clients.consumer.RoundRobinAssignor 是 RoundRobin 策略的实现类。还可以使用自定义策略,属性值设为自定义类的名字。
broker 用来标识从客户端发送过来的消息,可以是任意字符串,通常被用在日志、度量指标和配额中。
用于控制单次调用 call() 方法能够返回的记录数量,帮助控制在轮询里需要处理的数据量。
分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果设为-1就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
每次调用 poll() 方法,总是返回 Kafka 中还没有被消费者读取过的记录,使用偏移量(offset)来记录消费者读取的分区的位置。
更新分区当前位置的操作叫做“提交(commit)”,消费者是如何提交偏移量的呢?
消费者向一个特殊的 Topic: _consumer_offset
发送消息,消息包含每个分区的偏移量。偏移量只有在消费者发生崩溃或者有新的消费者加入群组触发再均衡时有用。完成再均衡之后,消费者可能分配到新的分区,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的 offset,然后从 offset 指定的地方继续处理。
如果提交的 offset 大于客户端处理的最后一个消息偏移量,那么处于两个偏移量之间的消息会丢失。反之则会消息重复。
所以,处理偏移量的方式对应用程序会有很大的影响。KafkaConsumer API 提供了多种方式来提交偏移量。
最简单的方式是消费者自动提交偏移量。如果 enable.auto.commit 设为 true,那 么每过一定时间间隔,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认是5s。
自动提交是在轮询里进行的。消费者每次在进行轮询时会检查是否需要提交偏移量,如果是,那么会提交从上一次轮询返回的偏移量。
假设我们使用默认的5s提交时间间隔,在最近一次提交之后的3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,这3s内的数据已经处理过,再次消费是还会获取到。通过调低提交时间间隔来更频繁地提交偏移量,减小可能出现重复消费的时间窗,不过这种情况是无法完全避免的。
在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,并不 知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法前也会进行自动提交)。
在处理异常或提前退出轮询时要格外小心。自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。
KafkaConsumer API 提供的另一种提交偏移量的方式,程序主动触发提交当前偏移量,而不是基于时间间隔自动提交。
把 auto.commit.offset 设为 false,使用 commitSync() 方法提交偏移量最简单也最可靠,该方法会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
需要注意,commitSync() 将会提交 poll() 返回的最新偏移量,在处理完所有记录后调用 commitSync(),否则还是会有丢失消息的风险。
commitSync() 提交偏移量的例子:
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); // 处理消息的逻辑省略 } try { // poll 的数据全部处理完提交 consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } }
只要没有发生不可恢复的错误,commitSync() 会一直尝试直至提交成功。如果提交 失败会抛出 CommitFailedException 异常。
手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会阻塞,这会影响应用程序的吞吐量。可以使用异步提交的方式,不等待 broker 的响应。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 异步提交 consumer.commitAsync(); }
在成功提交或发生无法恢复的错误之前,commitSync() 会一直尝试直至提交成功,但是 commitAsync() 不会,这也是该方法的一个问题。之所以不进行重试,是因为在收到服务器响应之前,可能有一个更大的偏移量已经提交成功。
假设我们发出一个请求提交偏移量2000,这个时候发生了短暂的通信问题,服务器收不到请求,与此同时,程序处理了另外一批消息,并成功提交了偏移量3000。如果 commitAsync() 重新尝试提交偏移量2000,有可能将偏移量3000改为2000,这个时候如果发生再均衡,就会出现重复消息。
commitAsync() 支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,如果要用它来进行重试,一定要注意提交的顺序。
commitAsync() 支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,如果要用它来进行重试,一定要注意提交的顺序。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { // 提交完成时回回调此函数 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception){ if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); }
重试异步提交
可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,放弃重试。
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,如果因为临时网络问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
在消费者关闭前一般会组合使用 commitAsync() 和 commitSync():
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 异步提交 consumer.commitAsync(); } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { // 同步提交 consumer.commitSync(); } finally { consumer.close(); } }
在正常处理流程中,使用异步提交来提高性能,最后使用同步提交来保证位移提交成功。
一般提交偏移量的频率与处理消息批次的频率是一样的。如果想要更频繁地提交怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?
这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
KafkaConsumer API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。因为消费者可能不只读取一个分区,需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。
// 记录分区的 offset 信息 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); int count = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value()); // 省略消息处理逻辑 ... // 记录分区的 offset currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata") ); // 最多每处理 1000 条记录就提交一次偏移量 if (count % 1000 == 0) consumer.commitAsync(currentOffsets, null); count++; } }
消费者在退出和进行分区再均衡之前,如果消费者知道要失去对一个分区的所有权,它可能需要提交最后一个已处理记录的偏移量。KafakConsumer API 可以在消费者新增分区或者失去分区时进行处理,在调用 subscribe() 方法时传入 ConsumerRebalanceListener 对象,该对象有两个方法:
下面来看一个的例子,在消费者失去某个分区时提交 offset,以便其他消费者可以接着消费消息并处理:
// 记录分区的 offset 信息 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); class HandleRebalanceimplements ConsumerRebalanceListener{ public void onPartitionsAssigned(Collection<TopicPartition> partitions){ } // 如果发生再均衡,即将失去分区所有权时提交偏移量。 // 提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。 public void onPartitionsRevoked(Collection<TopicPartition> partitions){ System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); } } // ... try { // 把 ConsumerRebalanceListener 对象传给 subscribe() 方法 consumer.subscribe(topics, new HandleRebalance()); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata") ); } consumer.commitAsync(currentOffsets, null); } } catch (WakeupException e) { // ignore } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); } }
除了读取最近一次提交的位置开始消费数据,有时候也需要从特定的偏移量处开始读取消息。
如果想从分区起始位置开始消费,可以使用 seekToBeginning(TopicPartition tp);如果想从分区的最末端消费最新的消息,可以使用 seekToEnd(TopicPartition tp)。Kafka 还支持从指定 offset 处开始消费。最典型的一个是:offset 维护在其他系统(例如数据库)中,并且以其他系统的值为准。
考虑下面的场景:从 Kafka 中读取消息进行处理,最后把结果写入数据库,可能会按如下逻辑处理:
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); processRecord(record); storeRecordInDB(record); consumer.commitAsync(currentOffsets); } }
看似正确的逻辑要注意的是,在持久化到数据库成功后,提交位移到 Kafka 可能会失败,出现不一致的情况,那么这可能会导致消息会重复处理。对于这种情况,我们需要将持久化到数据库与提交 offset 实现为原子性操作,最简单的做法,在保存记录到数据库的同时保存 offset 信息,在消费者开始消费时指定数据库的 offset 开始消费。
只需要通过 seek() 来指定分区位移开始消费即可:
class SaveOffsetsOnRebalanceimplements ConsumerRebalanceListener{ public void onPartitionsRevoked(Collection<TopicPartition> partitions){ // 在分区被回收前提交数据库事务,保存消费的记录和位移 commitDBTransaction(); } public void onPartitionsAssigned(Collection<TopicPartition> partitions){ // 在开始消费前,从数据库中获取分区的位移,使用 seek() 指定开始消费的偏移量 for(TopicPartition partition: partitions) consumer.seek(partition, getOffsetFromDB(partition)); } } // ... consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); // 调用一次 poll() 方怯,让消费者加入到消费者群组里,并获取分配到的分区 consumer.poll(0); // 然后马上调用 seek() 方法定位分区的偏移量。 // seek() 方法只更新我们正在使用的位置,在下一次调用 poll() 时就可以获得正确的消息。 // 如果 seek() 发生错误, poll() 就会抛出异常。 for (TopicPartition partition: consumer.assignment()) consumer.seek(partition, getOffsetFromDB(partition)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { processRecord(record); // 保存记录结果 storeRecordInDB(record); // 保存位移信息 storeOffsetInDB(record.topic(), record.partition(), record.offset()); } // 提交数据库事务 commitDBTransaction(); }
一般情况下,在主线程中循环 poll() 消息并进行处理。当需要退出循环时,使用另一个线程调用 consumer.wakeup(),会使得 poll() 抛出 WakeupException。如果主线程正在处理消息,那么在下一次主线程调用 poll() 时会抛出异常。样例代码:
// 注册 JVM 关闭时的回调,当 JVM 关闭时调用 Runtime.getRuntime().addShutdownHook(new Thread() { public void run(){ System.out.println("Starting exit..."); // 调用消费者的 wakeup 方法通知主线程退出 consumer.wakeup(); try { // 等待主线程退出 mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); ... // 消费主线程 try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { // ... } consumer.commitSync(); } } catch (WakeupException e) { // ignore } finally { consumer.close(); }
Kafka 生产者将对象序列化成字节数组并发送到服务器,消费者需要将字节数组转换成对象(反序列化)。序列化与反序列化需要匹配,与生产者类似,推荐使用 Avro 序列化方式。
样例代码如下(与生产者实现类似):
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", schemaUrl); String topic = "customerContacts" KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url)); consumer.subscribe(Collections.singletonList(topic)); System.out.println("Reading topic:" + topic); while (true) { // 这里使用之前生产者使用的Avro生成的Customer类 ConsumerRecords<String, Customer> records = consumer.poll(1000); for (ConsumerRecord<String, Customer> record: records) { System.out.println("Current customer name is: " + record.value().getName()); } consumer.commitSync(); }
一般情况下都是使用消费者组(即使只有一个消费者)来消费消息的,这样可以在增加或减少消费者时自动进行分区重平衡,这种方式是推荐的。
在知道主题和分区的情况下,也可以使用单个消费者来进行消费,需要实现给消费者分配分区,而不是让消费者订阅主题。代码样例:
// 获取主题下所有的分区 List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic"); if (partitionInfos != null) { for (PartitionInfo partition : partitionInfos) partitions.add(new TopicPartition(partition.topic(), partition.partition())); // 为消费者指定分区 consumer.assign(partitions); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record: records) { // ... } consumer.commitSync(); } }
除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过 consumer.partitionsFor() 来重新获取分区。
《Kafka权威指南》