官方英文版本: A Guide To The Kafka Protocol
中文翻译: watchword 翻译于2016年1月31日,修改于6月17日,基于原文2016年5月5日修改版本(v.106)修改翻译: Kafka通讯协议指南
smallnest 基于原文 Jan 20, 2017版本修改。
如果想深入了解Kafka的通讯协议的话,这篇文章不可不读。感谢 watchword 将原文翻译成了中文,我基于最新版进行了修订,修订和完善翻译中的错误。
此文档涵盖了Kafka 0.8及之后版本实现的通讯协议。其目的是提供一个易读的协议文档, 包含可用的请求API及其二进制格式, 以及如何正确使用他们来实现一个客户端。本文假设您已经了解了Kafka基本的设计以及 术语 。
0.7 和更早的版本所使用的协议与此类似,但我们(希望)通过一次性地斩断兼容性,以便清理原有设计上的沉疴,并且泛化一些概念。
Kafka协议是相当简单的,只有六种核心的客户端请求的API:
每一种API都将在下面详细说明。此外,从0.9版本开始,Kafka支持为消费者和Kafka Connect提供通用的分组管理。为此客户端API又提供五个请求:
最后,有几个管理API,可用于监控/管理的Kafka集群( KIP-4 完成时,这个列表将变长):
Kafka使用基于TCP的二进制协议。该协议定义了所有API的请求及响应消息。所有消息都是通过长度来分隔,并且由后面描述的基本类型组成。
客户端初始化一个socket连接,并且写入请求的消息序列和读回相应的响应消息。连接和断开时均不需要握手消息。如果保持长连接,那么TCP协议本身将会节省很多TCP握手时间,但如果真的重新建立连接,那么代价也相当小。
客户可能需要维持到多个broker的连接,因为数据是被分区的,而客户端需要和存储这些分区的broker服务器进行通讯。当然,一般而言,不需要为单个服务端和单个客户端间维护多个连接(即连接池技术)。
服务器保证单一的TCP连接中,请求将被顺序处理,响应也将按该顺序返回。为保证broker的处理请求的顺序,单个连接同时也只会处理一个请求指令。请注意,客户端可以(也应该)使用非阻塞IO实现请求流水线,从而实现更高的吞吐量。也就是说,客户可以在等待上次请求应答的同时发送下个请求,因为待完成的请求将会在底层操作系统套接字缓冲区进行缓冲。除非特别说明,所有的请求是由客户端启动,并从服务器获取到相应的响应消息。
服务器能够配置请求大小的最大限制,超过这个限制将导致socket连接被断开。
Kafka是一个分区系统,所以不是所有的服务器都有完整的数据集。Topic被分为P(预先定义的分区数量)个分区,每个分区被复制N(复制因子)份,Topic Partition根据顺序在“提交日志”中编号为0,1,…,P。
所有具有这种特性的系统都有一个如何制定某个特定数据应该被分配给哪个特定的分区的问题。Kafka中它由客户端直接控制分配策略,broker则没有特别的语义来决定消息发布到哪个分区。相反,生产者直接将消息发送到一个特定的分区,获取消息时,消费者也直接从某个特定的分区获取。如果两个生产者要使用相同的分区方案,那么他们必须用同样的方法来计算Key到分区映射关系。
这些发布或获取数据的请求必须发送到指定分区的 leader broker中。此条件同时也会由broker检查,发送到不正确的broker的请求将会返回NotLeaderForPartition 错误代码(后文所描述的)。
那么客户端如何找出存在的topic呢,这些topic有哪些分区,以及这些分区被哪些broker保存,以便它可以直接将请求发送到正确的主机上?这个信息是动态的,因此你不能只是为每个客户端配置一些静态映射。相反所有的Kafka broker都可以回答描述集群当前状态的数据请求:有哪些topic,这些topic都有多少分区,哪个broker是这些分区的Leader,以及这些broker主机的地址和端口信息。
换句话说,客户端只需要找到一个broker,broker将会告知客户端所有其他存在的broker,以及这些broker上面的所有分区。这个broker本身也可能会掉线,因此客户端实现的最佳做法是保存两三个broker地址。用户可以选择使用负载均衡器或只是静态地配置两个或三个客户的Kafka主机。
客户并不需要轮询地查看集群是否已经改变;它可以获取元数据一次然后缓存起来,等到它接收到所用的元数据过期的错误信息时再更新元数据。这种错误有两种形式:(1)套接字错误指示客户端不能与特定的broker进行通信,
(2)请求响应表明该broker不再是其请求数据分区的Leader的错误。
轮询“初始”Kafka的URL列表,直到我们找到一个我们可以连接到的broker,获取集群元数据。
处理获取数据或者生产消息请求,根据这些请求所发送的topic和分区,将这些请求发送到合适的broker。
如果我们得到一个合适的错误(显示元数据已经过期时),刷新元数据,然后再试一次。
上面提到消息的分区分配是由生产者客户端控制,那么,这个功能是如何暴露给最终用户的?
在Kafka中,分区有两个目的:
对于给定的使用场景下,你可能只关心其中的一个或两个。
为了实现简单的负载均衡,一个简单的策略是客户端发布消息是对所有broker进行轮询请求(round robin requests)。另一种选择,在那些生产者比消费者多的场景下,给每个客户机随机选择一个分区并发布消息到该分区。后一种的策略能够使用少得多的TCP连接。
语义分区是指使用消息中的 key 来决定分配的分区。例如,如果你正在处理一个点击消息流时,你可能想通过用户ID来划分流,使得特定用户的所有数据会被单个消费者消费。要做到这一点,客户端可以采取与消息相关联的key,并使用key的某种Hash值来选择要发送的分区。
我们的API鼓励将小的请求批量处理以提高效率。我们发现这能非常显著地提升性能。我们两个用来发送消息和获取消息的API,总是鼓励处理一连串的消息,而不是单一的消息。聪明的客户端可以利用这一点,并支持“异步”操作模式,以此进行批处理哪些单独发送的消息,并把它们以较大的块进行发送。我们可以进一步允许跨多个主题和分区的批处理,所以生产请求可能包含追加到许多分区的数据,一个读取请求可以一次性从多个分区提取数据的。
当然,如果他们喜欢,客户端实现者可以选择忽略这一点,所有消息一次都发送一个。
该协议的目的要达到在向后兼容的基础上渐进演化。我们的版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。
这样做的目的是允许客户端实现相应特定版本的请求,在请求中标志版本信息。目标主要是为了在不允许停机的环境下进行更新,这种环境下,客户端和服务器不能一次性都切换所使用的API。
服务器会拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。建议的升级路径方式是,新功能将首先部署到服务器(老客户端无法完全利用他们的新功能),然后随着新的客户端的部署,这些新功能将逐步被利用。
目前,所有版本从0开始,当我们介绍这些API时,我们将分别显示每个版本的格式。
该协议是建立在下列基本类型之上。
定长基本类型(Fixed Width Primitives)
int8, int16, int32, int64 – 不同精度(以bit数区分)的带符号整数,以大端(Big Endiam)方式存储.
变长基本类型(Variable Length Primitives)
bytes, string – 这些类型由一个表示长度的带符号整数N以及后续N字节的内容组成。长度如果为-1表示空(null). string 使用int16表示长度,bytes使用int32.
数组(Arrays)
这个类型用来处理重复的结构体数据。他们总是由一个代表元素个数int32整数N,以及后续的N个重复结构体组成,这些结构体自身是有其他的基本数据类型组成。我们后面会用BNF语法展示一个foo的结构体数组[foo]
后面的 BNF 明确地以上下文无关的语法展示了请求和响应的二进制格式。每个API都会一起给出请求和响应定义,以及所有的子定义(sub-definitions)。BNF使用没有经过缩写的便于阅读的名称(比如我使用一个符号化了的名称来定义了一个production 错误码,即便它只是int16整数)。一般在BNF中,一个production序列表示一个连接,所以下面给出的MetadataRequest将是一个含有VersionId,然后clientId,然后TopicNames的数组(每一个都有其自身的定义)。自定义类型一般使用驼峰法拼写,基本类型使用全小写方式拼写。当存在多中可能的自定义类型时,使用’|’符号分割,并且用括号表示分组。顶级定义不缩进,后续的子部分会被缩进。
所有请求和响应都从以下语法为基础,其余的会在本文剩下部分中进行增量描述:
RequestOrResponse => Size (RequestMessage | ResponseMessage) Size => int32
域(FIELD) | 描述 |
---|---|
MessageSize | MessageSize 域给出了后续请求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节请求内容。 |
所有请求都具有以下格式:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
域(FIELD) | 描述 |
---|---|
ApiKey | 这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等). |
ApiVersion | 这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式。 |
CorrelationId | 这是一个用户提供的整数。它将会被服务器原封不动地回传给客户端。用于匹配客户机和服务器之间的请求和响应。 |
ClientId | 这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组。 |
下面我们就来描述各种请求和响应消息。
Response => CorrelationId ResponseMessage CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
域(FIELD) | 描述 |
---|---|
CorrelationId | 服务器传回给客户端它所提供用作关联请求和响应消息的整数。 |
所有响应都是与请求成对匹配(例如,我们将发送回一个元数据请求,会得到一个元数据响应)。
生产和获取消息指令请求共享同一个消息集结构。在Kafka中,消息是由一个键值对以及少量相关的元数据组成。消息只是一个有偏移量和大小信息的消息序列。这种格式正好即用于在broker上的磁盘上存储,也用在线上数据交换。
消息集也是Kafka中的压缩单元,我们也允许消息递归包含压缩消息从而允许批量压缩。
注意, 在通讯协议中,消息集之前没有类似的其他数组元素的int32。
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32
消息格式
v0 Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes Value => bytes v1 (supported since 0.10.0) Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Timestamp => int64 Key => bytes Value => bytes
域(FIELD) | 描述 |
---|---|
Offset | 这是在Kafka中作为日志序列号使用的偏移量。当生产者发送非压缩消息,这时候它可以填写任意值。当生产者发送压缩消息,为了避免服务端重新压缩,每个压缩消息的内部消息的偏移量应该从0开始,然后依次增加(kafka压缩消息详见后面的描述)。 |
Crc | Crc是的剩余消息字节的CRC32值。broker和消费者可用来检查信息的完整性。 |
MagicByte | 这是一个用于允许消息二进制格式的向后兼容演化的版本id。当前值是0。 |
Attributes | 这个字节保存有关信息的元数据属性。最低的3位包含用于消息的压缩编解码器。第四位表示时间戳类型,0代表CreateTime,1代表LogAppendTime。生产者必须把这个位设成0。所有其他位必须被设置为0。 |
Timestamp | 消息的时间戳。时间戳类型在Attributes域中体现。单位为从UTC标准准时间1970年1月1日0点到所在时间的毫秒数。 |
Key | Key是一个可选项,它主要用来进行指派分区。Key可以为null。 |
Value | Value是消息的实际内容,类型是字节数组。Kafka支持本身递归包含,因此本身也可能是一个消息集。消息可以为null。 |
Kafka支持压缩多条消息以提高效率,当然,这比压缩一条原始消息要来得复杂。因为单个消息可能没有足够的冗余信息以达到良好的压缩比,压缩的多条信息必须以特殊方式批量发送(当然,如果真的需要的话,你可以自己压缩批处理的一个消息)。要被发送的消息被包装(未压缩)在一个MessageSet结构中,然后将其压缩并存储在一个单一的“消息”中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。外层MessageSet应该只包含一个压缩的“消息”(详情见 Kafka-1718 )。
Kafka目前支持一下两种压缩算法:
压缩算法(COMPRESSION) | 编码器编号(CODEC) |
---|---|
None | 0 |
GZIP | 1 |
Snappy | 2 |
本节将给出每个API的用法、二进制格式,以及它们的字段的含义的细节。
这个API回答下列问题:
这是唯一一个能发往集群中任意一个broker的请求消息。
因为可能有很多topic,客户端可以给一个的可选topic列表,以便只返回一组topic元数据。
返回的元数据信息是分区级别的信息,为了方便和以避免冗余,以topic为组集中在一起。每个分区的元数据中包含了leader以及所有副本以及正在同步的副本的信息。
注意: 如果broker配置中设置了”auto.create.topics.enable”, topic元数据请求将会以默认的复制因子和默认的分区数为参数创建topic。
TopicMetadataRequest => [TopicName] TopicName => string
域(FIELD | ) 描述 |
---|---|
TopicName | 要获取元数据的主题数组。 如果为空,就返回所有主题的元数据 |
响应包含的每个分区的元数据,这些分区元数据以topic为组组装在一起。该元数据以broker id来指向具体的broker。每个broker有一个地址和端口。
MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32]
域(FIELD) | 描述 |
---|---|
Leader | 该分区作为Leader节点的Kafka broker id。如果在一个Leader选举过程中,没有Leader存在,这个id将是-1。 |
Replicas | 该分区中,其他活着的作为slave的节点集合。 |
Isr | 副本集合中,所有处在与Leader跟随(“caught up”,表示数据已经完全复制到这些节点)状态的子集 |
Broker | kafka broker节点的id, 主机名, 端口信息 |
生产者API用于将消息集发送到服务器。为了提高效率,它允许在单个请求中发送多个不同topic的不同分区的消息。
生产者API使用通用的消息集格式,但由于发送时还没有被分配偏移量,因此可以任意填写该值。
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later) ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32
v1及以后版本表示客户端可以在response中解析quota throttle time。
v2及以后版本表示客户端可以解析response中的时间戳域。
域(FIELD) | 描述 |
---|---|
RequiredAcks | 这个值表示服务端收到多少确认后才发送反馈消息给客户端。如果设置为0,那么服务端将不发送response(这是唯一的服务端不发送response的情况)。如果这个值为1,那么服务器将等到数据写入到本地日之后发送response。如果这个值是-1,那么服务端将阻塞,知道这个消息被所有的同步副本写入后再发送response。 |
Timeout | 这个值提供了以毫秒为单位的超时时间,服务器可以在这个时间内可以等待接收所需的Ack确认的数目。超时并非一个确切的限制,有以下原因:(1)不包括网络延迟,(2)计时器开始在这一请求的处理开始,所以如果有很多请求,由于服务器负载而导致的排队等待时间将不被包括在内,(3)如果本地写入时间超过超时,我们将不会终止本地写操作,这样这个超时时间就不会得到遵守。要使硬超时时间,客户端应该使用套接字超时。 |
TopicName | 该数据将会发布到的topic名称 |
Partition | 该数据将会发布到的分区 |
MessageSetSize | 后续消息集的长度,字节为单位 |
MessageSet | 上面描述的标准格式的消息集合 |
v0 ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 v1 (supported in 0.9.0 or later) ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 ThrottleTime => int32 v2 (supported in 0.10.0 or later) ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 Timestamp => int64 ThrottleTime => int32
域 | 描述 |
---|---|
Topic | 此响应对应的主题。 |
Partition | 此响应对应的分区。 |
ErrorCode | 如果有,此分区对应的错误信息。错误以分区为单位提供,因为可能存在给定的分区不可用或者被其他的主机维护(非Leader),但是其他的分区的请求操作成功的情况 |
Offset | 追加到该分区的消息集中的分配给第一个消息的偏移量。 |
Timestamp | 如果该主题使用了LogAppendTime,这个时间戳就是broker分配给这个消息集。这个消息集中的所有消息都有相同的时间戳。如果使用的是CreateTime,这个域始终是-1。如果没有返回错误码,生产者可以假定消息的时间戳已经被broker接受。单位为从UTC标准准时间1970年1月1日0点到所在时间的毫秒数。 |
ThrottleTime | 由于限额冲突而导致的时间延迟长度,以毫秒为单位。(如果没有违反限额条件,此值为0) |
可能的错误码(Possible Error Codes):( TODO)
获取消息接口用于获取一些topic分区的一个或多个的日志块。逻辑上根据指定topic,分区和消息起始偏移量开始获取一批消息。在一般情况下,返回消息的偏移量将大于或等于开始偏移量。然而,如果是压缩消息,有可能返回的消息的偏移量比起始偏移量小。这类的消息的数量通常较少,并且调用者必须负责过滤掉这些消息。
获取数据指令请求遵循一个长轮询模型,如果没有足够数量的消息可用,它们可以阻塞一段时间。
作为优化,服务器被允许在消息集的末尾返回一个消息的一部分。客户端应处理这种情况。
有一点要注意的是,获取消息API需要指定消费的分区。现在的问题是如何让消费者知道消费哪个分区?特别地,作为一组消费者,如何使得每个消费者获取分区的一个子集,并且平衡这些分区。我们使用zookeeper动态地为Scala和Java客户端完成这个任务。这种方法的缺点是,它需要一个相当胖的客户端并且需要客户端与zookeeper连接。我们尚未创建一个Kafka接口(API),允许该功能被移动到在服务器端并被更方便地访问。一个简单的消费者的客户端可以通过配置指定访问的分区,但这样将不能在某些消费者失效后做到分区的动态重新分配。我们希望能在下一个主要版本解决这一空白。
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32
域 | 描述 |
---|---|
ReplicaId | 副本ID的是发起这个请求的副本节点ID。普通消费者客户端应该始终将其指定为-1,因为他们没有节点ID。其他broker设置他们自己的节点ID。基于调试目的,以非代理身份模拟副本broker发出获取数据指令请求时,这个值填-2。 |
MaxWaitTime | 如果没有足够的数据可发送时,最大阻塞等待时间,以毫秒为单位。 |
MinBytes | 返回响应消息的最小字节数目,必须设置。如果客户端将此值设为0,服务器将会立即返回,但如果没有新的数据,服务端会返回一个空消息集。如果它被设置为1,则服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达到。通过设置较高的值,结合超时设置,消费者可以在牺牲一点实时性能的情况下通过一次读取较大的字节的数据块从而提高的吞吐量(例如,设置MaxWaitTime至100毫秒,设置MinBytes为64K,将允许服务器累积数据达到64K前等待长达100ms再响应)。 |
TopicName | topic名称 |
Partition | 获取数据的Partition id |
FetchOffset | 获取数据的起始偏移量 |
MaxBytes | 此分区返回消息集所能包含的最大字节数。这有助于限制响应消息的大小。 |
v0 FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32 v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later) FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32
域 | 描述 |
---|---|
ThrottleTime | 由于限额冲突而导致的时间延迟长度,以毫秒为单位。(如果没有违反限额条件,此值为0) |
TopicName | 返回消息所对应的Topic名称。 |
Partition | 返回消息所对应的分区id。 |
HighwaterMarkOffset | 此分区日志中最末尾的偏移量。此信息可被客户端用来确定后面还有多少条消息。 |
MessageSetSize | 此分区中消息集的字节长度 |
MessageSet | 此分区获取到的消息集,格式与之前描述相同 |
v1只会包含v0格式的消息.
v2可能即包含v0又包含v1版本格式的消息.
可能的错误码(Possible Error Codes)
此API描述了一组topic分区的偏移量有效范围。生产者和获取数据API的请求必须发送到分区Leader所在的broker上,这需要通过使用元数据的API来确定。
自版本0, response包含请求的分区的起始偏移量以及“log end offset”,即,将被追加到给定分区中的下一个消息的偏移量。kafka 0.10.1.0开始支持版本1, 它开始支持根据时间戳进行基于时间索引的查找,API做了一点改变来支持这个特性。注意这个API只支持开启 0.10消息格式的topic,否则返回UNSUPPORTED_FOR_MESSAGE_FORMAT错误。
// v0 ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 MaxNumberOfOffsets => int32 // v1 (supported in 0.10.1.0 and later) ListOffsetRequest => ReplicaId [TopicName [Partition Time]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64
域 | 描述 |
---|---|
Time | 用来请求一定时间(毫秒)前的所有消息。这里有两个特殊取值:-1表示获取最后一个offset(也就是后面即将到来消息的offset值); -2表示获取最早的有效偏移量。注意,因为获取到偏移值都是降序排序,因此请求最早Offset的请求将总是返回一个值 |
// v0 OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64 // v1 ListOffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode Timestamp [Offset] Partition => int32 ErrorCode => int16 Timestamp => int64 Offset => int64
可能的错误码(Possible Error Codes)
这些API使得偏移量的能够集中管理。了解更多 偏移量管理 。按照 Kafka-993 的评论,直到Kafka 0.8.1.1,这些API调用无法全部正常使用,他们这将在0.8.2版本中提供。
消费者组(Consumer Group)偏移量信息,由一个特定的broker维护,这个broker称为消费者组协调员。即消费者需要向从这个特定的broker提交和获取偏移量。可以通过发出一组协调员发现请求从而获得当前协调员信息。
GroupCoordinatorRequest => GroupId GroupId => string
GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string CoordinatorPort => int32
可能的错误码(Possible Error Codes)
v0 (supported in 0.8.1 or later) OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]] ConsumerGroupId => string TopicName => string Partition => int32 Offset => int64 Metadata => string v1 (supported in 0.8.2 or later) OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string TopicName => string Partition => int32 Offset => int64 TimeStamp => int64 Metadata => string v2 (supported in 0.9.0 or later) OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]] ConsumerGroupId => string ConsumerGroupGenerationId => int32 ConsumerId => string RetentionTime => int64 TopicName => string Partition => int32 Offset => int64 Metadata => string
在V0和v1版本中,每个分区的时间戳作为提交时间戳定义,偏移量协调员将保存消费者所提交的偏移量,直到当前时间超过提交时间戳+偏移量保留时间,此偏移量保留时间在broker配置中指定;如果时间戳域没有设值,那么broker会将此值设定为接收到提交偏移量请求的时间,用户可以通过设置这个提交时间戳达到延长偏移量保存时间的目的。
在v2版本中,我们移除了时间戳域,但是增加了一个全局保存时间域(详情参见 KAFKA-1634 );broker会设置提交时间戳为接收到请求的时间,但是提交的偏移量能被保存到提交请求中用户指定的保存时间,如果这个保存时间没有设值(-1),那么broker会使用默认的保存时间。
注意,当这个API在“simple consumer”模式下使用,并非作为消费者组一员时,那么generationId必须被设置成-1,并且memberId必须为空(非null)。另外,如果有一个活动的消费者组有同样的groupId,那么提交Offset的请求将会被拒绝(一般会返回UNKNOWN_MEMBER_ID或者ILLEGAL_GENERATION错误)。
v0, v1 and v2: OffsetCommitResponse => [TopicName [Partition ErrorCode]]] TopicName => string Partition => int32 ErrorCode => int16
可能的错误码(Possible Error Codes)
OFFSET_METADATA_TOO_LARGE (12)
GROUP_LOAD_IN_PROGRESS (14)
GROUP_COORDINATOR_NOT_AVAILABLE (15)
NOT_COORDINATOR_FOR_GROUP (16)
ILLEGAL_GENERATION (22)
UNKNOWN_MEMBER_ID (25)
REBALANCE_IN_PROGRESS (27)
INVALID_COMMIT_OFFSET_SIZE (28)
TOPIC_AUTHORIZATION_FAILED (29)
GROUP_AUTHORIZATION_FAILED (30)
根据 KAFKA-1841 的注释,V0和V1是相同的,但V0(0.8.1或更高版本支持)从zookeeper读取的偏移量,而V1(0.8.2或更高版本支持)从Kafka读偏移量。
v0 and v1 (supported in 0.8.2 or after): OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] ConsumerGroup => string TopicName => string Partition => int32
v0 and v1 (supported in 0.8.2 or after): OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] TopicName => string Partition => int32 Offset => int64 Metadata => string ErrorCode => int16
请注意,消费者组中一个topic的分区如果没有偏移量,broker不会设定一个错误码(因为它不是一个真正的错误),但会返回空的元数据并将偏移字段为-1。
偏移量获取请求v0和v1版本之间没有格式上的区别。功能实现上来说,v0版本从zookeeper获取偏移量,v1版本从Kafka中获取偏移量。
可能的错误码(Possible Error Codes)
这些请求用于客户端参加Kafka所管理的消费者组。从更高层次上看,集群中每个消费者组都会分配一个broker(即消费者组协调员),以简化消费者组管理。一旦得到了组协调员地址(使用上面的消费者组协调员请求),组成员可以加入该组,同步状态,然后用心跳消息保持在组中的活跃状态。当客户端关闭时,它会使用离开组请求从消费者组中注销。此协议的语义在Kafka客户端分配协议中有详细描述。
组建管理接口的主要使用场景是消费者组,但这些请求也尽量设计得一般化以便支持其他应用场景(例如,Kafka Connect组)。这种设计的带来的代价就是是一些特定的组语义(group semantics)被推到了客户端实现。例如,下面定义的JoinGroup和SyncGroup请求无明确定义的字段以支持消费者组分区分配。相反,它们在其中包含有一些通用的字节数组(byte arrays),用这些字节数组就可以使得分区分配切入在消费者客户端实现。
但是,尽管这种实现允许每个客户端来实现来定义它们自己的嵌入schema,但是Kafka工具的兼容性要求这些客户端使用Kafka客户端使用的标准方案。例如,consumer-groups.sh这个应用程序会假定用这种格式来显示分区分配。因此,我们建议客户遵循相同的模式,使这些工具对所有客户端实现都可以正常工作。
加入组请求用于让客户端成为组的成员。当新成员加入一个现有组,之前加入的所有的会员必须通过发送一个新加入组的要求来重新入组。当成员第一次加入该组,成员编号将是空的(即“”),但重新加入的成员都应该使用与之前生成的相同的会员ID。
v0 supported in 0.9.0.0 and greater JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols GroupId => string SessionTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata] ProtocolName => string ProtocolMetadata => bytes v1 supported in 0.10.1.0 and greater JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols GroupId => string SessionTimeout => int32 RebalanceTimeout => int32 MemberId => string ProtocolType => string GroupProtocols => [ProtocolName ProtocolMetadata] ProtocolName => string ProtocolMetadata => bytes
SessionTimeout字段指示客户端的存活。如果组协调员在session过期前没有收到一个心跳, 那么组员会被移除组。0.10.1前的版本, session timeout也被用作完成所需的 rebalance。一旦组管理员开始rebalance, 每一个组员会触发session timeout以便发送新的JoinGroup请求。 如果它们失败了,它们会从组中移除。 在0.10.1中,新版JoinGroup会使用一个独立的RebalanceTimeout来创建,一旦rebanlance开始,每个客户端触发过期以便重新加入,但是如果session timeout小于rebalance timeout, 客户端还是会持续发送heatbeat。
ProtocolType字段定义了该组实现的嵌入协议。组协调器确保该组中的所有成员都支持相同的协议类型。组中包含的协议(GroupProtocols)字段中的协议名称和元数据的含义取决于协议类型。请注意,加入群请求允许多协议/元数据对。这使得滚动升级时无需停机。协调器会选择所有成员支持的一种协议,升级后的成员既包括新版本和老版本的协议,一旦所有成员都升级,协调器将选择列在数组中最前面的组协议(GroupProtocol)。
消费者组: 下文我们定义了消费者组使用的嵌入协议。我们建议所有消费者客户端实现遵循这个格式,以便Kafka工具能够对所有的客户端正常工作
ProtocolType => "consumer" ProtocolName => AssignmentStrategy AssignmentStrategy => string ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => bytes
UserData域的可以用来自定义分配策略。例如,在一个粘性分区策略实现中,这个字段可以包含之前的分配。在基于资源的分配策略,也可以包括每个运行消费者主机上的CPU个数等信息。
Kafka Connect使用“connect”的协议类型,和协议细节也是基于Connect的内部实现。
接收到来自该组中的所有成员组的加入组请求后,协调器将选择一个成员作为Leader,并且选择所有成员支持的协议。Leader将收到会员的完整列表与选择的协议相关的元数据。其他追随者成员,会收到一个空会员数组。Leader需要检查每个成员的元数据,并且使用下文中描述的SyncGroup请求来分配状态。
一旦加入组阶段完成,协调器会增加该组的GenerationId,这个Id是发送给每个成员的response中的一个字段,同时也会在心跳和偏移量提交请求中。当协调器重新rebalance了一个组,协调器将发送一个错误码,表示客户端成员需要重新加入组。如果重新平衡完成前成员未重入组(rejoin),那么它将有一个旧generationId,在新的请求使用这个旧Id时,这将导致ILLEGAL_GENERATION错误。
v0 and v1 supported in 0.9.0 and greater JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members ErrorCode => int16 GenerationId => int32 GroupProtocol => string LeaderId => string MemberId => string Members => [MemberId MemberMetadata] MemberId => string MemberMetadata => bytes
消费者组: 协调器负责选择所有成员都兼容协议(即分区分配策略),Leader是实际执行分配的成员,加入群请求可以包含多个分配策略,从而支持现有版本升级或者更改不同的分配策略。
可能的错误码(Possible Error Codes):
组长(group leader)使用同步组请求用来向当前组中的所有成员进行状态分配(例如分区分配)。所有成员加入该组后,立即发送SyncGroup,但只有Leader承担这个工作。
SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment GroupId => string GenerationId => int32 MemberId => string GroupAssignment => [MemberId MemberAssignment] MemberId => string MemberAssignment => bytes
消费者组: 消费则组中MemberAssignment字段的格式如下:
MemberAssignment => Version PartitionAssignment Version => int16 PartitionAssignment => [Topic [Partition]] Topic => string Partition => int32 UserData => bytes
所有实现了“consumer”协议类型的客户端实现都需要支持这个scheme。
组中的每个成员都会接收到leader发出的同步组响应。
SyncGroupResponse => ErrorCode MemberAssignment ErrorCode => int16 MemberAssignment => bytes
可能的错误代码(Possible Error Codes):
每当一个成员加入并同步完成,他将开始发送心跳请求使自己留在组里。当协调器在配置的会话超时时间内没有他的收到心跳请求,该成员会被踢出该组。
HeartbeatRequest => GroupId GenerationId MemberId GroupId => string GenerationId => int32 MemberId => string
HeartbeatResponse => ErrorCode ErrorCode => int16
可能的错误代码(Possible Error Codes):
当想要离开组群时,用户可以发送一个退组请求。这优先于会话超时,因为它能使该组快速再平衡,这对于消费者而言这意味着可以用更短的时间将分区分配到一个活动的成员。
LeaveGroupRequest => GroupId MemberId GroupId => string MemberId => string
LeaveGroupResponse => ErrorCode ErrorCode => int16
可能的错误代码(Possible Error Codes):
该API可用于找到当前被broker管理的组群。为了得到集群内的所有组列表,你必须向所有broker发送组列表请求。
ListGroupsRequest =>
ListGroupsResponse => ErrorCode Groups ErrorCode => int16 Groups => [GroupId ProtocolType] GroupId => string ProtocolType => string
可能的错误代码(Possible Error Codes):
DescribeGroupsRequest => [GroupId] GroupId => string
DescribeGroupsResponse => [ErrorCode GroupId State ProtocolType Protocol Members] ErrorCode => int16 GroupId => string State => string ProtocolType => string Protocol => string Members => [MemberId ClientId ClientHost MemberMetadata MemberAssignment] MemberId => string ClientId => string ClientHost => string MemberMetadata => bytes MemberAssignment => bytes
可能的错误代码(Possible Error Codes):
下面是请求中ApiKey的数字值,用来表示上面所述的请求类型。
接口名称(API NAME) | APIKEY值 |
---|---|
ProduceRequest | 0 |
FetchRequest | 1 |
OffsetRequest | 2 |
MetadataRequest | 3 |
Non-user facing control APIs | 4-7 |
OffsetCommitRequest | 8 |
OffsetFetchRequest | 9 |
GroupCoordinatorRequest | 10 |
JoinGroupRequest | 11 |
HeartbeatRequest | 12 |
LeaveGroupRequest | 13 |
SyncGroupRequest | 14 |
DescribeGroupsRequest | 15 |
ListGroupsRequest | 16 |
我们用数字代码表示服务器发生的问题。这些可以由客户端转换成客户端中的异常(Exceptions)或者其他任何适当的错误处理机制。这里是当前正在使用的错误代码表:
错误名称(Error) | 编码(Code) | 是否可重试(Retriable) | Description | 描述 |
---|---|---|---|---|
NoError | 0 | No error–it worked! | 没有错误 | |
Unknown | -1 | An unexpected server error | 服务器未知错误 | |
OffsetOutOfRange | 1 | The requested offset is outside the range of offsets maintained by the server for the given topic/partition. | 请求的偏移量超过服务器维护的主题分区的偏移量。 | |
InvalidMessage / CorruptMessage | 2 | Yes | This indicates that a message contents does not match its CRC | 这个错误表示消息的内容与它的CRC校验码不符合。 |
UnknownTopicOrPartition | 3 | Yes | This request is for a topic or partition that does not exist on this broker. | broker上不存在所请求的主题或者分区。 |
InvalidMessageSize | 4 | The message has a negative size | 消息长度为负数。 | |
LeaderNotAvailable | 5 | Yes | This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes. | 这个错误会在leader选举之间抛出,一样那位此时这个分区没有leader因此不能被写入。 |
NotLeaderForPartition | 6 | Yes | This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date. | 这个错误表示客户端正在把消息发送给副本,而不是分区的leader。这说明客户端的元数据已经过期。 |
RequestTimedOut | 7 | Yes | This error is thrown if the request exceeds the user-specified time limit in the request. | 当这个请求超过了用户自定义的请求时间限制抛出此错误 |
BrokerNotAvailable | 8 | This is not a client facing error and is used mostly by tools when a broker is not alive. | 这个不是客户端所能接受到的错误,一般被工具用在broker没有活动的场合。 | |
ReplicaNotAvailable | 9 | If replica is expected on a broker, but is not (this can be safely ignored). | 当broker希望有副本而实际上并没有时抛出(这个错误可以被安全地忽略)。 | |
MessageSizeTooLarge | 10 | The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum. | 当服务器配置了一个最大消息长度以避免无限制的内存分配时,客户端产生了一个超过这个最大值的消息会抛出此错误。 | |
StaleControllerEpochCode | 11 | Internal error code for broker-to-broker communication. | broker之间内部通讯是的错误。 | |
OffsetMetadataTooLargeCode | 12 | If you specify a string larger than configured maximum for offset metadata | 如果你赋了一个超过所配置的最大偏移量元数据的字符串时触发。 | |
GroupLoadInProgressCode | 14 | Yes | The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition), or in response to group membership requests (such as heartbeats) when group metadata is being loaded by the coordinator. | broker会在以下情况下返回这个错误:当broker人在加载偏移量时(主题分区的leader发生变化后)请求偏移量获取请求;或者正在反馈组成员请求(比如心跳)时,组的元数据正在被协调器加载。 |
GroupCoordinatorNotAvailableCode | 15 | Yes | The broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active. | 组协调器请求,偏移量提交和大部分组管理请求时,偏移量主题还没有被建立或者组协调器还没有激活是broker会返回此错误。 |
NotCoordinatorForGroupCode | 16 | Yes | The broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for. | 非该组协调器的broker接收到一个偏移量获取或提交请求时返回此错误。 |
InvalidTopicCode | 17 | For a request which attempts to access an invalid topic (e.g. one which has an illegal name), or if an attempt is made to write to an internal topic (such as the consumer offsets topic). | 请求指令尝试访问一个非法的主题(例如,一个包含非法名称的主题),或者尝试写入一个内部主题(例如消费者偏移量主题)。 | |
RecordListTooLargeCode | 18 | If a message batch in a produce request exceeds the maximum configured segment size. | 批处理消息片段数组的长度超过了配置的最大消息片段数。 | |
NotEnoughReplicasCode | 19 | Yes | Returned from a produce request when the number of in-sync replicas is lower than the configured minimum and requiredAcks is -1. | 当同步中的副本数量小于配置的最小数量,并且requiredAcks设置为-1时返回此错误 |
NotEnoughReplicasAfterAppendCode | 20 | Yes | Returned from a produce request when the message was written to the log, but with fewer in-sync replicas than required. | 消息已经写入日志文件,但是同步中的副本数量比请求中要求的数量少时返回此错误码 |
InvalidRequiredAcksCode | 21 | Returned from a produce request if the requested requiredAcks is invalid (anything other than -1, 1, or 0). | 请求的requiredAcks非法(任何非-1,1或者0)时返回此错误码。 | |
IllegalGenerationCode | 22 | Returned from group membership requests (such as heartbeats) when the generation id provided in the request is not the current generation. | 组籍管理请求(诸如心跳请求)时generation id不是与当前不一致时返回此错误码 | |
InconsistentGroupProtocolCode | 23 | Returned in join group when the member provides a protocol type or set of protocols which is not compatible with the current group. | 加入组请求时成员提供的协议类型或者协议类型组与当前组不兼容时返回。 | |
InvalidGroupIdCode | 24 | Returned in join group when the groupId is empty or null. | 加入组请求时groupId为空或者null是返回。 | |
UnknownMemberIdCode | 25 | Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation. | 组请求(偏移量提交/获取,心跳等)时memberId不在当前的generation。 | |
InvalidSessionTimeoutCode | 26 | Return in join group when the requested session timeout is outside of the allowed range on the broker | 加入组请求时请求的会话超时超过broker允许的限制。 | |
RebalanceInProgressCode | 27 | Returned in heartbeat requests when the coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group. | 心跳请求时协调器已经开始了组的再平衡,这意味着客户端必须重新加入组。 | |
InvalidCommitOffsetSizeCode | 28 | This error indicates that an offset commit was rejected because of oversize metadata. | 这个错意味着偏移量提交因为超过了元数据大小而被拒绝。 | |
TopicAuthorizationFailedCode | 29 | Returned by the broker when the client is not authorized to access the requested topic. | 客户端没有访问请求主题的权限时,broker返回此错误。 | |
GroupAuthorizationFailedCode | 30 | Returned by the broker when the client is not authorized to access a particular groupId. | 客户端没有访问特定groupId的权限时,broker返回此错误。 | |
ClusterAuthorizationFailedCode | 31 | Returned by the broker when the client is not authorized to use an inter-broker or administrative API. | 客户端没有权限访问broker之间的接口或者管理接口时,broker返回此错误。 |
有些人问,为什么我们不使用HTTP。有许多原因,最主要的是客户端实现可以使用一些更高级的TCP特性–请求的多工(multiplex)能力(译者注:同一个TCP连接中同时发送多个请求,http长连接必须等到前一次请求结束才能发送后一个请求,否则需要多个http连接),同时轮询多个连接的能力,等等。我们还发现HTTP库在许多编程语言中非常是出奇地破旧(shabby -_-!)。
还有人问,也许我们可以支持许多不同的协议。此前的经验是,多协议支持的是很难添加和测试新功能,因为他们要被移植到许多协议实现中。我们感觉,大多数用户并不在乎支持多个协议这些特性,他们只是希望在自己选择的语言中实现了良好可靠的客户端。
另一个问题是,为什么我们不采用XMPP,STOMP,AMQP或现有的协议。这个问题的不同协议有不同答案,但在共通的问题是,这些协议的确确定了大部分实现,但如果我们没有协议的控制权,我们就实现不了我们的功能。我们相信,我们可以实现比现有消息系统更好的真正的分布式消息系统,但要做到这一点,我们需要建立不同的工作模式。
最后一个问题是,为什么我们不使用的Protocol Buffers或Thrift来定义我们的请求消息格式。这些库擅长帮助您管理非常多的序列化的消息。然而,我们只有几个消息。而且这些库跨语言的支持是有点参差不齐(取决于软件包)。最后,我们颇为谨慎地管理二进制日志格式和传输协议之间的映射,而用如果使用这些系统将变得不太可能。最后,我们比较喜欢让API有明确的版本并且通过检查版来引入原本为空的新值,因为它能更细致地控制兼容性。