原创文章,转载请务必将下面这段话置于文章开头处。 本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/01/02/Kafka深度解析
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个 Zookeeper 集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的 Scribe 和Cloudera的 Flume ,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
每个日志文件都是“log entries”序列,每一个 log entry
包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的 log entry
的offset范围,如下图所示。
因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在 $KAFKA_HOME/config/server.properties
中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。
<span class="line"><span class="comment"># The default number of log partitions per topic. More partitions allow greater</span></span> <span class="line"><span class="comment"># parallelism for consumption, but this will also result in more files across</span></span> <span class="line"><span class="comment"># the brokers.</span></span> <span class="line">num.partitions=<span class="number">3</span></span>1234<spanclass="line"><spanclass="comment"># The default number of log partitions per topic. More partitions allow greater</span></span><spanclass="line"><spanclass="comment"># parallelism for consumption, but this will also result in more files across</span></span><spanclass="line"><spanclass="comment"># the brokers.</span></span><spanclass="line">num.partitions=<spanclass="number">3</span></span>
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现 kafka.producer.Partitioner
接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
<span class="line"><span class="keyword">import</span> kafka.producer.Partitioner;</span> <span class="line"><span class="keyword">import</span> kafka.utils.VerifiableProperties;</span> <span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">JasonPartitioner</span><<span class="title">T</span>> <span class="keyword">implements</span> <span class="title">Partitioner</span> </span>{</span> <span class="line"> <span class="function"><span class="keyword">public</span> <span class="title">JasonPartitioner</span><span class="params">(VerifiableProperties verifiableProperties)</span> </span>{}</span> <span class="line"> <span class="annotation">@Override</span></span> <span class="line"> <span class="function"><span class="keyword">public</span> <span class="keyword">int</span> <span class="title">partition</span><span class="params">(Object key, <span class="keyword">int</span> numPartitions)</span> </span>{</span> <span class="line"> <span class="keyword">try</span> {</span> <span class="line"> <span class="keyword">int</span> partitionNum = Integer.parseInt((String) key);</span> <span class="line"> <span class="keyword">return</span> Math.abs(Integer.parseInt((String) key) % numPartitions);</span> <span class="line"> } <span class="keyword">catch</span> (Exception e) {</span> <span class="line"> <span class="keyword">return</span> Math.abs(key.hashCode() % numPartitions);</span> <span class="line"> }</span> <span class="line"> }</span> <span class="line">}</span> <span class="line"> </span> <span class="line"> 如果将上例中的<span class="class"><span class="keyword">class</span>作为<span class="title">partition</span>.<span class="title">class</span>,并通过如下代码发送20条消息(<span class="title">key</span>分别为0,1,2,3)至<span class="title">topic2</span>(包含4个<span class="title">partition</span>)。</span> <span class="line"> </span> <span class="line"><span class="title">public</span> <span class="title">void</span> <span class="title">sendMessage</span>() <span class="title">throws</span> <span class="title">InterruptedException</span></span>{</span> <span class="line"> <span class="keyword">for</span>(<span class="keyword">int</span> i = <span class="number">1</span>; i <= <span class="number">5</span>; i++){</span> <span class="line"> List messageList = <span class="keyword">new</span> ArrayList<KeyedMessage<String, String>>();</span> <span class="line"> <span class="keyword">for</span>(<span class="keyword">int</span> j = <span class="number">0</span>; j < <span class="number">4</span>; j++){</span> <span class="line"> messageList.add(<span class="keyword">new</span> KeyedMessage<String, String>(<span class="string">"topic2"</span>, j+<span class="string">""</span>, <span class="string">"The "</span> + i + <span class="string">" message for key "</span> + j));</span> <span class="line"> }</span> <span class="line"> producer.send(messageList);</span> <span class="line"> }</span> <span class="line"> producer.close();</span> <span class="line">}</span>12345678910111213141516171819202122232425262728293031<spanclass="line"><spanclass="keyword">import</span>kafka.producer.Partitioner;</span><spanclass="line"><spanclass="keyword">import</span>kafka.utils.VerifiableProperties;</span> <spanclass="line"><spanclass="keyword">public</span><spanclass="class"><spanclass="keyword">class</span><spanclass="title">JasonPartitioner</span><<spanclass="title">T</span>><spanclass="keyword">implements</span><spanclass="title">Partitioner</span></span>{</span> <spanclass="line"> <spanclass="function"><spanclass="keyword">public</span><spanclass="title">JasonPartitioner</span><spanclass="params">(VerifiablePropertiesverifiableProperties)</span></span>{}</span> <spanclass="line"> <spanclass="annotation">@Override</span></span><spanclass="line"> <spanclass="function"><spanclass="keyword">public</span><spanclass="keyword">int</span><spanclass="title">partition</span><spanclass="params">(Objectkey,<spanclass="keyword">int</span>numPartitions)</span></span>{</span><spanclass="line"> <spanclass="keyword">try</span>{</span><spanclass="line"> <spanclass="keyword">int</span>partitionNum=Integer.parseInt((String)key);</span><spanclass="line"> <spanclass="keyword">return</span>Math.abs(Integer.parseInt((String)key)%numPartitions);</span><spanclass="line"> }<spanclass="keyword">catch</span>(Exceptione){</span><spanclass="line"> <spanclass="keyword">return</span>Math.abs(key.hashCode()%numPartitions);</span><spanclass="line"> }</span><spanclass="line"> }</span><spanclass="line">}</span><spanclass="line"></span><spanclass="line">如果将上例中的<spanclass="class"><spanclass="keyword">class</span>作为<spanclass="title">partition</span>.<spanclass="title">class</span>,并通过如下代码发送20条消息(<spanclass="title">key</span>分别为0,1,2,3)至<spanclass="title">topic2</span>(包含4个<spanclass="title">partition</span>)。</span><spanclass="line"></span> <spanclass="line"><spanclass="title">public</span><spanclass="title">void</span><spanclass="title">sendMessage</span>()<spanclass="title">throws</span><spanclass="title">InterruptedException</span></span>{</span><spanclass="line"><spanclass="keyword">for</span>(<spanclass="keyword">int</span>i=<spanclass="number">1</span>;i<=<spanclass="number">5</span>;i++){</span><spanclass="line"> ListmessageList=<spanclass="keyword">new</span>ArrayList<KeyedMessage<String,String>>();</span><spanclass="line"> <spanclass="keyword">for</span>(<spanclass="keyword">int</span>j=<spanclass="number">0</span>;j<<spanclass="number">4</span>;j++){</span><spanclass="line"> messageList.add(<spanclass="keyword">new</span>KeyedMessage<String,String>(<spanclass="string">"topic2"</span>,j+<spanclass="string">""</span>,<spanclass="string">"The "</span>+i+<spanclass="string">" message for key "</span>+j));</span><spanclass="line"> }</span><spanclass="line"> producer.send(messageList);</span><spanclass="line"> }</span><spanclass="line">producer.close();</span><spanclass="line">}</span>
则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。如下图所示。
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置 $KAFKA_HOME/config/server.properties
,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
<span class="line"> <span class="comment">############################# Log Retention Policy #############################</span></span> <span class="line"><span class="comment"># The following configurations control the disposal of log segments. The policy can</span></span> <span class="line"><span class="comment"># be set to delete segments after a period of time, or after a given size has accumulated.</span></span> <span class="line"><span class="comment"># A segment will be deleted whenever *either* of these criteria are met. Deletion always happens</span></span> <span class="line"><span class="comment"># from the end of the log.</span></span> <span class="line"><span class="comment"># The minimum age of a log file to be eligible for deletion</span></span> <span class="line">log.retention.hours=<span class="number">168</span></span> <span class="line"><span class="comment"># A size-based retention policy for logs. Segments are pruned from the log as long as the remaining</span></span> <span class="line"><span class="comment"># segments don't drop below log.retention.bytes.</span></span> <span class="line"><span class="comment">#log.retention.bytes=1073741824</span></span> <span class="line"><span class="comment"># The maximum size of a log segment file. When this size is reached a new log segment will be created.</span></span> <span class="line">log.segment.bytes=<span class="number">1073741824</span></span> <span class="line"><span class="comment"># The interval at which log segments are checked to see if they can be deleted according</span></span> <span class="line"><span class="comment"># to the retention policies</span></span> <span class="line">log.retention.check.interval.ms=<span class="number">300000</span></span> <span class="line"><span class="comment"># By default the log cleaner is disabled and the log retention policy will default to </span></span> <span class="line"><span class="comment">#just delete segments after their retention expires.</span></span> <span class="line"><span class="comment"># If log.cleaner.enable=true is set the cleaner will be enabled and individual logs </span></span> <span class="line"><span class="comment">#can then be marked for log compaction.</span></span> <span class="line">log.cleaner.enable=<span class="literal">false</span></span>1234567891011121314151617181920212223242526<spanclass="line"><spanclass="comment">############################# Log Retention Policy #############################</span></span> <spanclass="line"><spanclass="comment"># The following configurations control the disposal of log segments. The policy can</span></span><spanclass="line"><spanclass="comment"># be set to delete segments after a period of time, or after a given size has accumulated.</span></span><spanclass="line"><spanclass="comment"># A segment will be deleted whenever *either* of these criteria are met. Deletion always happens</span></span><spanclass="line"><spanclass="comment"># from the end of the log.</span></span> <spanclass="line"><spanclass="comment"># The minimum age of a log file to be eligible for deletion</span></span><spanclass="line">log.retention.hours=<spanclass="number">168</span></span> <spanclass="line"><spanclass="comment"># A size-based retention policy for logs. Segments are pruned from the log as long as the remaining</span></span><spanclass="line"><spanclass="comment"># segments don't drop below log.retention.bytes.</span></span><spanclass="line"><spanclass="comment">#log.retention.bytes=1073741824</span></span> <spanclass="line"><spanclass="comment"># The maximum size of a log segment file. When this size is reached a new log segment will be created.</span></span><spanclass="line">log.segment.bytes=<spanclass="number">1073741824</span></span> <spanclass="line"><spanclass="comment"># The interval at which log segments are checked to see if they can be deleted according</span></span><spanclass="line"><spanclass="comment"># to the retention policies</span></span><spanclass="line">log.retention.check.interval.ms=<spanclass="number">300000</span></span> <spanclass="line"><spanclass="comment"># By default the log cleaner is disabled and the log retention policy will default to </span></span><spanclass="line"><spanclass="comment">#just delete segments after their retention expires.</span></span><spanclass="line"><spanclass="comment"># If log.cleaner.enable=true is set the cleaner will be enabled and individual logs </span></span><spanclass="line"><spanclass="comment">#can then be marked for log compaction.</span></span><spanclass="line">log.cleaner.enable=<spanclass="literal">false</span></span>
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
Kafka从0.8开始提供partition级别的replication,replication的数量可在 $KAFKA_HOME/config/server.properties
中配置。
<span class="line">default.replication.factor = <span class="number">1</span></span>1<spanclass="line">default.replication.factor=<spanclass="number">1</span></span>
该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。 每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,leader批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。
leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在 $KAFKA_HOME/config/server.properties
中配置
<span class="line"><span class="comment">#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead</span></span> <span class="line">replica.lag.max.messages=<span class="number">4000</span></span> <span class="line"><span class="comment">#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead</span></span> <span class="line">replica.lag.time.max.ms=<span class="number">10000</span></span>12345<spanclass="line"><spanclass="comment">#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead</span></span><spanclass="line">replica.lag.max.messages=<spanclass="number">4000</span></span> <spanclass="line"><spanclass="comment">#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead</span></span><spanclass="line">replica.lag.time.max.ms=<spanclass="number">10000</span></span>
需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过 request.required.acks
来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。
这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。
上文说明了Kafka是如何做replication的,另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。
一种非常常用的选举leader的方式是“majority vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在 Zookeeper 这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于 majority-vote-based journal ,但是它的数据存储并没有使用这种expensive的方式。
实际上,leader election算法非常多,比如Zookeper的 Zab , Raft 和 Viewstamped Replication 。而Kafka所使用的leader election算法更像微软的 PacificA 算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。
虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。
这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
(本节所有描述都是基于consumer hight level API而非low level API)。
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。下图展示了Kafka在Linkedin的一种简化部署。
为了更清晰展示Kafka consumer group的特性,笔者作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。如下图所示。
(本节所讲述内容均基于Kafka consumer high level API)
Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。
增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。
再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
consumer rebalance算法如下:
目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:
Force itself to rebalance within in its consumer group.
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。
目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但该方式有不利的方面:
根据Kafka官方文档,Kafka作者正在考虑在还未发布的 0.9.x版本中使用中心协调器(coordinator) 。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:
通过上文介绍,想必读者已经明天了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下来要讨论的是Kafka如何确保消息在producer和consumer之间传输。有这么几种可能的delivery guarantee:
At most once
消息可能会丢,但绝不会重复传输 At least one
消息绝不会丢,但可能会重复传输 Exactly once
每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。 Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了 Exactly one
。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了 At least once
,但可通过设置producer异步发送实现 At most once
)。
接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了 Exactly once
。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
At most once
At least once
。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是 Exactly once
。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性–如是否幂等性,当成Kafka本身的feature) Exactly once
,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现 Exactly once
。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中) At least once
,并且允许通过设置producer异步提交来实现 At most once
。而 Exactly once
要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。 纸上得来终觉浅,绝知些事要躬行。笔者希望能亲自测一下Kafka的性能,而非从网上找一些测试数据。所以笔者曾在0.8发布前两个月做过详细的Kafka0.8性能测试,不过很可惜测试报告不慎丢失。所幸在网上找到了Kafka的创始人之一的 Jay Kreps的bechmark 。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1)
该benchmark用到了六台机器,机器配置如下
1Gb Ethernet
这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。6个drive都直接以非RAID方式挂载。实际上kafka对机器的需求与Hadoop的类似。
该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。
在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是 821,557 records/second ( 78.3MB/second )。
之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。
整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。
初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。
该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为 786,980 records/second ( 75.1MB/second )。
该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完成replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。
这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。
该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被 in sync
集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果为 421,823 records/second ( 40.2MB/second )。
Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有 in sync
集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。
该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为 2,024,032 records/second ( 193,0MB/second )。
消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。
但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试,下图是吞吐率与数据量大小的关系图。
上图中有一些variance的存在,并可以明显看到,吞吐率并不受磁盘上所存数据量大小的影响。实际上从上图可以看到,当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别。
这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。上图的测试结果是在生产环境中对Kafka集群做了些tuning后得到的,这些tuning方法可参考 这里 。
需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。
该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为 940,521 records/second ( 89.7MB/second )。
可以看到,Kafkar的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用 sendfile API 来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑 I/O stat ,你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。
将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为 2,615,968 records/second ( 249.5MB/second )。
正如所预期的那样,consumer的吞吐率几乎线性增涨。
上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。
该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为 795,064 records/second ( 75.8MB/second )。
可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。
上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。下图是records/second与消息长度的关系图。
正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,如下图所示。
从上图可以看出,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。
上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是, 2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile) 。
(这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)
如果读者想要在自己的机器上重现本次benchmark测试,可以参考 本次测试的配置和所使用的命令 。
实际上Kafka Distribution提供了producer性能测试工具,可通过 bin/kafka-producer-perf-test.sh
脚本来启动。所使用的命令如下
<span class="line">Producer</span> <span class="line">Setup</span> <span class="line">bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --create --topic <span class="built_in">test</span>-rep-one --partitions <span class="number">6</span> --replication-factor <span class="number">1</span></span> <span class="line">bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --create --topic <span class="built_in">test</span> --partitions <span class="number">6</span> --replication-factor <span class="number">3</span></span> <span class="line">Single thread, no replication</span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span>7 <span class="number">50000000</span> <span class="number">100</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">8196</span></span> <span class="line">Single-thread, async <span class="number">3</span>x replication</span> <span class="line">bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --create --topic <span class="built_in">test</span> --partitions <span class="number">6</span> --replication-factor <span class="number">3</span></span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span>6 <span class="number">50000000</span> <span class="number">100</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">8196</span></span> <span class="line">Single-thread, sync <span class="number">3</span>x replication</span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span> <span class="number">50000000</span> <span class="number">100</span> -<span class="number">1</span> acks=-<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">64000</span></span> <span class="line">Three Producers, <span class="number">3</span>x async replication</span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span> <span class="number">50000000</span> <span class="number">100</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">8196</span></span> <span class="line">Throughput Versus Stored Data</span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span> <span class="number">50000000000</span> <span class="number">100</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">8196</span></span> <span class="line">Effect of message size</span> <span class="line"><span class="keyword">for</span> i <span class="keyword">in</span> <span class="number">10</span> <span class="number">100</span> <span class="number">1000</span> <span class="number">10000</span> <span class="number">100000</span>;</span> <span class="line"><span class="keyword">do</span></span> <span class="line"><span class="built_in">echo</span> <span class="string">""</span></span> <span class="line"><span class="built_in">echo</span> <span class="variable">$i</span></span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span> $((<span class="number">1000</span>*<span class="number">1024</span>*<span class="number">1024</span>/<span class="variable">$i</span>)) <span class="variable">$i</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">128000</span></span> <span class="line"><span class="keyword">done</span>;</span> <span class="line">Consumer</span> <span class="line">Consumer throughput</span> <span class="line">bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --messages <span class="number">50000000</span> --topic <span class="built_in">test</span> --threads <span class="number">1</span></span> <span class="line"><span class="number">3</span> Consumers</span> <span class="line">On three servers, run:</span> <span class="line">bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --messages <span class="number">50000000</span> --topic <span class="built_in">test</span> --threads <span class="number">1</span></span> <span class="line">End-to-end Latency</span> <span class="line">bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> <span class="built_in">test</span> <span class="number">5000</span></span> <span class="line">Producer and consumer</span> <span class="line">bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance <span class="built_in">test</span> <span class="number">50000000</span> <span class="number">100</span> -<span class="number">1</span> acks=<span class="number">1</span> bootstrap.servers=esv4-hcl198.grid.linkedin.com:<span class="number">9092</span> buffer.memory=<span class="number">67108864</span> batch.size=<span class="number">8196</span></span> <span class="line">bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:<span class="number">2181</span> --messages <span class="number">50000000</span> --topic <span class="built_in">test</span> --threads <span class="number">1</span></span>1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253<spanclass="line">Producer</span><spanclass="line">Setup</span><spanclass="line">bin/kafka-topics.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--create--topic<spanclass="built_in">test</span>-rep-one--partitions<spanclass="number">6</span>--replication-factor<spanclass="number">1</span></span><spanclass="line">bin/kafka-topics.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--create--topic<spanclass="built_in">test</span>--partitions<spanclass="number">6</span>--replication-factor<spanclass="number">3</span></span> <spanclass="line">Singlethread,noreplication</span> <spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span>7<spanclass="number">50000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">8196</span></span> <spanclass="line">Single-thread,async<spanclass="number">3</span>xreplication</span> <spanclass="line">bin/kafktopics.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--create--topic<spanclass="built_in">test</span>--partitions<spanclass="number">6</span>--replication-factor<spanclass="number">3</span></span><spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span>6<spanclass="number">50000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">8196</span></span> <spanclass="line">Single-thread,sync<spanclass="number">3</span>xreplication</span> <spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span><spanclass="number">50000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=-<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">64000</span></span> <spanclass="line">ThreeProducers,<spanclass="number">3</span>xasyncreplication</span><spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span><spanclass="number">50000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">8196</span></span> <spanclass="line">ThroughputVersusStoredData</span> <spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span><spanclass="number">50000000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">8196</span></span> <spanclass="line">Effectofmessagesize</span> <spanclass="line"><spanclass="keyword">for</span>i<spanclass="keyword">in</span><spanclass="number">10</span><spanclass="number">100</span><spanclass="number">1000</span><spanclass="number">10000</span><spanclass="number">100000</span>;</span><spanclass="line"><spanclass="keyword">do</span></span><spanclass="line"><spanclass="built_in">echo</span><spanclass="string">""</span></span><spanclass="line"><spanclass="built_in">echo</span><spanclass="variable">$i</span></span><spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span>$((<spanclass="number">1000</span>*<spanclass="number">1024</span>*<spanclass="number">1024</span>/<spanclass="variable">$i</span>))<spanclass="variable">$i</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">128000</span></span><spanclass="line"><spanclass="keyword">done</span>;</span> <spanclass="line">Consumer</span><spanclass="line">Consumerthroughput</span> <spanclass="line">bin/kafka-consumer-perf-test.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--messages<spanclass="number">50000000</span>--topic<spanclass="built_in">test</span>--threads<spanclass="number">1</span></span> <spanclass="line"><spanclass="number">3</span>Consumers</span> <spanclass="line">Onthreeservers,run:</span><spanclass="line">bin/kafka-consumer-perf-test.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--messages<spanclass="number">50000000</span>--topic<spanclass="built_in">test</span>--threads<spanclass="number">1</span></span> <spanclass="line">End-to-endLatency</span> <spanclass="line">bin/kafka-run-class.shkafka.tools.TestEndToEndLatencyesv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>esv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span><spanclass="built_in">test</span><spanclass="number">5000</span></span> <spanclass="line">Producerandconsumer</span> <spanclass="line">bin/kafka-run-class.shorg.apache.kafka.clients.tools.ProducerPerformance<spanclass="built_in">test</span><spanclass="number">50000000</span><spanclass="number">100</span>-<spanclass="number">1</span>acks=<spanclass="number">1</span>bootstrap.servers=esv4-hcl198.grid.linkedin.com:<spanclass="number">9092</span>buffer.memory=<spanclass="number">67108864</span>batch.size=<spanclass="number">8196</span></span> <spanclass="line">bin/kafka-consumer-perf-test.sh--zookeeperesv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span>--messages<spanclass="number">50000000</span>--topic<spanclass="built_in">test</span>--threads<spanclass="number">1</span></span>
broker配置如下
<span class="line"><span class="comment">############################# Server Basics #############################</span></span> <span class="line"><span class="comment"># The id of the broker. This must be set to a unique integer for each broker.</span></span> <span class="line">broker.id=<span class="number">0</span></span> <span class="line"><span class="comment">############################# Socket Server Settings #############################</span></span> <span class="line"><span class="comment"># The port the socket server listens on</span></span> <span class="line">port=<span class="number">9092</span></span> <span class="line"><span class="comment"># Hostname the broker will bind to and advertise to producers and consumers.</span></span> <span class="line"><span class="comment"># If not set, the server will bind to all interfaces and advertise the value returned from</span></span> <span class="line"><span class="comment"># from java.net.InetAddress.getCanonicalHostName().</span></span> <span class="line"><span class="comment">#host.name=localhost</span></span> <span class="line"><span class="comment"># The number of threads handling network requests</span></span> <span class="line">num.network.threads=<span class="number">4</span></span> <span class="line"><span class="comment"># The number of threads doing disk I/O</span></span> <span class="line">num.io.threads=<span class="number">8</span></span> <span class="line"><span class="comment"># The send buffer (SO_SNDBUF) used by the socket server</span></span> <span class="line">socket.send.buffer.bytes=<span class="number">1048576</span></span> <span class="line"><span class="comment"># The receive buffer (SO_RCVBUF) used by the socket server</span></span> <span class="line">socket.receive.buffer.bytes=<span class="number">1048576</span></span> <span class="line"><span class="comment"># The maximum size of a request that the socket server will accept (protection against OOM)</span></span> <span class="line">socket.request.max.bytes=<span class="number">104857600</span></span> <span class="line"><span class="comment">############################# Log Basics #############################</span></span> <span class="line"><span class="comment"># The directory under which to store log files</span></span> <span class="line">log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs</span> <span class="line"><span class="comment"># The number of logical partitions per topic per server. More partitions allow greater parallelism</span></span> <span class="line"><span class="comment"># for consumption, but also mean more files.</span></span> <span class="line">num.partitions=<span class="number">8</span></span> <span class="line"><span class="comment">############################# Log Flush Policy #############################</span></span> <span class="line"><span class="comment"># The following configurations control the flush of data to disk. This is the most</span></span> <span class="line"><span class="comment"># important performance knob in kafka.</span></span> <span class="line"><span class="comment"># There are a few important trade-offs here:</span></span> <span class="line"><span class="comment"># 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.</span></span> <span class="line"><span class="comment"># 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).</span></span> <span class="line"><span class="comment"># 3. Throughput: The flush is generally the most expensive operation. </span></span> <span class="line"><span class="comment"># The settings below allow one to configure the flush policy to flush data after a period of time or</span></span> <span class="line"><span class="comment"># every N messages (or both). This can be done globally and overridden on a per-topic basis.</span></span> <span class="line"><span class="comment"># Per-topic overrides for log.flush.interval.ms</span></span> <span class="line"><span class="comment">#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000</span></span> <span class="line"><span class="comment">############################# Log Retention Policy #############################</span></span> <span class="line"><span class="comment"># The following configurations control the disposal of log segments. The policy can</span></span> <span class="line"><span class="comment"># be set to delete segments after a period of time, or after a given size has accumulated.</span></span> <span class="line"><span class="comment"># A segment will be deleted whenever *either* of these criteria are met. Deletion always happens</span></span> <span class="line"><span class="comment"># from the end of the log.</span></span> <span class="line"><span class="comment"># The minimum age of a log file to be eligible for deletion</span></span> <span class="line">log.retention.hours=<span class="number">168</span></span> <span class="line"><span class="comment"># A size-based retention policy for logs. Segments are pruned from the log as long as the remaining</span></span> <span class="line"><span class="comment"># segments don't drop below log.retention.bytes.</span></span> <span class="line"><span class="comment">#log.retention.bytes=1073741824</span></span> <span class="line"><span class="comment"># The maximum size of a log segment file. When this size is reached a new log segment will be created.</span></span> <span class="line">log.segment.bytes=<span class="number">536870912</span></span> <span class="line"><span class="comment"># The interval at which log segments are checked to see if they can be deleted according </span></span> <span class="line"><span class="comment"># to the retention policies</span></span> <span class="line">log.cleanup.interval.mins=<span class="number">1</span></span> <span class="line"><span class="comment">############################# Zookeeper #############################</span></span> <span class="line"><span class="comment"># Zookeeper connection string (see zookeeper docs for details).</span></span> <span class="line"><span class="comment"># This is a comma separated host:port pairs, each corresponding to a zk</span></span> <span class="line"><span class="comment"># server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".</span></span> <span class="line"><span class="comment"># You can also append an optional chroot string to the urls to specify the</span></span> <span class="line"><span class="comment"># root directory for all kafka znodes.</span></span> <span class="line">zookeeper.connect=esv4-hcl197.grid.linkedin.com:<span class="number">2181</span></span> <span class="line"><span class="comment"># Timeout in ms for connecting to zookeeper</span></span> <span class="line">zookeeper.connection.timeout.ms=<span class="number">1000000</span></span> <span class="line"><span class="comment"># metrics reporter properties</span></span> <span class="line">kafka.metrics.polling.interval.secs=<span class="number">5</span></span> <span class="line">kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter</span> <span class="line">kafka.csv.metrics.dir=/tmp/kafka_metrics</span> <span class="line"><span class="comment"># Disable csv reporting by default.</span></span> <span class="line">kafka.csv.metrics.reporter.enabled=<span class="literal">false</span></span> <span class="line">replica.lag.max.messages=<span class="number">10000000</span></span>1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495<spanclass="line"><spanclass="comment">############################# Server Basics #############################</span></span> <spanclass="line"><spanclass="comment"># The id of the broker. This must be set to a unique integer for each broker.</span></span><spanclass="line">broker.id=<spanclass="number">0</span></span> <spanclass="line"><spanclass="comment">############################# Socket Server Settings #############################</span></span> <spanclass="line"><spanclass="comment"># The port the socket server listens on</span></span><spanclass="line">port=<spanclass="number">9092</span></span> <spanclass="line"><spanclass="comment"># Hostname the broker will bind to and advertise to producers and consumers.</span></span><spanclass="line"><spanclass="comment"># If not set, the server will bind to all interfaces and advertise the value returned from</span></span><spanclass="line"><spanclass="comment"># from java.net.InetAddress.getCanonicalHostName().</span></span><spanclass="line"><spanclass="comment">#host.name=localhost</span></span> <spanclass="line"><spanclass="comment"># The number of threads handling network requests</span></span><spanclass="line">num.network.threads=<spanclass="number">4</span></span><spanclass="line"><spanclass="comment"># The number of threads doing disk I/O</span></span><spanclass="line">num.io.threads=<spanclass="number">8</span></span> <spanclass="line"><spanclass="comment"># The send buffer (SO_SNDBUF) used by the socket server</span></span><spanclass="line">socket.send.buffer.bytes=<spanclass="number">1048576</span></span> <spanclass="line"><spanclass="comment"># The receive buffer (SO_RCVBUF) used by the socket server</span></span><spanclass="line">socket.receive.buffer.bytes=<spanclass="number">1048576</span></span> <spanclass="line"><spanclass="comment"># The maximum size of a request that the socket server will accept (protection against OOM)</span></span><spanclass="line">socket.request.max.bytes=<spanclass="number">104857600</span></span> <spanclass="line"><spanclass="comment">############################# Log Basics #############################</span></span> <spanclass="line"><spanclass="comment"># The directory under which to store log files</span></span><spanclass="line">log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs</span> <spanclass="line"><spanclass="comment"># The number of logical partitions per topic per server. More partitions allow greater parallelism</span></span><spanclass="line"><spanclass="comment"># for consumption, but also mean more files.</span></span><spanclass="line">num.partitions=<spanclass="number">8</span></span> <spanclass="line"><spanclass="comment">############################# Log Flush Policy #############################</span></span> <spanclass="line"><spanclass="comment"># The following configurations control the flush of data to disk. This is the most</span></span><spanclass="line"><spanclass="comment"># important performance knob in kafka.</span></span><spanclass="line"><spanclass="comment"># There are a few important trade-offs here:</span></span><spanclass="line"><spanclass="comment"># 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.</span></span><spanclass="line"><spanclass="comment"># 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).</span></span><spanclass="line"><spanclass="comment"># 3. Throughput: The flush is generally the most expensive operation. </span></span><spanclass="line"><spanclass="comment"># The settings below allow one to configure the flush policy to flush data after a period of time or</span></span><spanclass="line"><spanclass="comment"># every N messages (or both). This can be done globally and overridden on a per-topic basis.</span></span> <spanclass="line"><spanclass="comment"># Per-topic overrides for log.flush.interval.ms</span></span><spanclass="line"><spanclass="comment">#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000</span></span> <spanclass="line"><spanclass="comment">############################# Log Retention Policy #############################</span></span> <spanclass="line"><spanclass="comment"># The following configurations control the disposal of log segments. The policy can</span></span><spanclass="line"><spanclass="comment"># be set to delete segments after a period of time, or after a given size has accumulated.</span></span><spanclass="line"><spanclass="comment"># A segment will be deleted whenever *either* of these criteria are met. Deletion always happens</span></span><spanclass="line"><spanclass="comment"># from the end of the log.</span></span> <spanclass="line"><spanclass="comment"># The minimum age of a log file to be eligible for deletion</span></span><spanclass="line">log.retention.hours=<spanclass="number">168</span></span> <spanclass="line"><spanclass="comment"># A size-based retention policy for logs. Segments are pruned from the log as long as the remaining</span></span><spanclass="line"><spanclass="comment"># segments don't drop below log.retention.bytes.</span></span><spanclass="line"><spanclass="comment">#log.retention.bytes=1073741824</span></span> <spanclass="line"><spanclass="comment"># The maximum size of a log segment file. When this size is reached a new log segment will be created.</span></span><spanclass="line">log.segment.bytes=<spanclass="number">536870912</span></span> <spanclass="line"><spanclass="comment"># The interval at which log segments are checked to see if they can be deleted according </span></span><spanclass="line"><spanclass="comment"># to the retention policies</span></span><spanclass="line">log.cleanup.interval.mins=<spanclass="number">1</span></span> <spanclass="line"><spanclass="comment">############################# Zookeeper #############################</span></span> <spanclass="line"><spanclass="comment"># Zookeeper connection string (see zookeeper docs for details).</span></span><spanclass="line"><spanclass="comment"># This is a comma separated host:port pairs, each corresponding to a zk</span></span><spanclass="line"><spanclass="comment"># server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".</span></span><spanclass="line"><spanclass="comment"># You can also append an optional chroot string to the urls to specify the</span></span><spanclass="line"><spanclass="comment"># root directory for all kafka znodes.</span></span><spanclass="line">zookeeper.connect=esv4-hcl197.grid.linkedin.com:<spanclass="number">2181</span></span> <spanclass="line"><spanclass="comment"># Timeout in ms for connecting to zookeeper</span></span><spanclass="line">zookeeper.connection.timeout.ms=<spanclass="number">1000000</span></span> <spanclass="line"><spanclass="comment"># metrics reporter properties</span></span><spanclass="line">kafka.metrics.polling.interval.secs=<spanclass="number">5</span></span><spanclass="line">kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter</span><spanclass="line">kafka.csv.metrics.dir=/tmp/kafka_metrics</span><spanclass="line"><spanclass="comment"># Disable csv reporting by default.</span></span><spanclass="line">kafka.csv.metrics.reporter.enabled=<spanclass="literal">false</span></span> <spanclass="line">replica.lag.max.messages=<spanclass="number">10000000</span></span>
读者也可参考另外一份 Kafka性能测试报告