1. Getting Started
1.1 Introduction
kafka是一个分布式的、可分区的、可备份提交数据的系统,它提供了一个消息系统,但是有自己独特的设计。这些意味着什么?
首先 我们看看kafka中的一些基本术语:
1 kafka根据topics来对消息进行分类 2 我们把发送消息到topic中的进程叫producers 3 我们把从topic中订阅消息的进程叫consumers 4 kafka以集群模式运行时,集群中包含一个或者多个kafka服务,每个服务称为一个broker
因此,在一个较高的角度来看,producers通过网络发送消息到kafka集群,consumer来订阅kafka集群中的消息,架构如下:客户端和服务端之间通讯是非常简单高性能的,采用TCP协议。
一个topic是一个类别,对于每个topic,kafka集群维护一个partitioned log,就像如下:
每个partition都是有序的,每个partition在操作系统存储层面都是append log,每个发布到partiton的消息都是直接追加到文件末尾的,每个消息在partition中都有一个唯一的id叫做offset,offset标记了消息在partition中的位置。
kafka集群在一个配置的消息生存期内,保留所有发布的消息,而不论该消息是否被消费掉。例如:一个消息被设置为保存两天,那么两天过后该消息会被清除掉,而不管该消息是否被消费。kafka通过这种方式来释放空间,kafka的性能并不会因为保存更多的日志而性能底下,实际上每个consumer只会维护一些消费消息的offset,这个offset是被consumer控制的,正常情况下offset一般会线性增长,这样消息是被顺序消费的,但是事实上consumer可以以任意的顺序来消费消息,consumer只要将offset设置为任意值即可。
这样的设计意味着kafka的consumer是非常轻量级的,它们的加入和离开不会被影响到kafka集群中的其他consumers和producers,consumer和producers的状态信息都是存储在zookeeper中的。
在log server中partitions的设计有多个原因,首先,通过partition可以使log文件的大小超过单台机器的容量限制,每个单独的partition的个体必须不超过本机的容量大小,但是一个topic可以有多个partitions,因此可以存储任意数量的数据。其次,可以提升并发消费的能力。
一个topic的partitions可以被分布在kafka集群中的多台机器上,每个机器上的kafka实例负责该机器上partition中数据的读写操作,每个partition可以配置被复制的份数,这些复制的份数会被复制到其他机器上,以提升高可用性。每个partition有一个server作为leader,0个或者多个server作为followers,leader处理所有的读写请求,followers只是跟进leader,进行消息同步。如果leader失效了,会有一个followers自动的升级为新的leader。每个partitions都会有一个leader和一些follower,kafka会将leader均衡的分布在集群中,因此负载会比较均衡一些。
producers把消息发送到它选定的topic中,producers还能指定发送到topic的哪个partition中,你可以通过round-robin或者其他算法来决定把消息发送到哪个partition中。
消息传统上有两种模式:queuing和publish-subscribe。在queue模式下,一个pool中的consumer从server上读取消息,消息会被他们中的一个消费掉。在publish-subscribe模式下,消息被广播到所有的consumers中。kafka提供了一个单一的抽象概念来包含了这两个模式,即consumer group。每个consumer属于一个consumer group中,每个group中可以有多个consumer,消息只能被consumer group中的某一个consumer消费掉。
consumer label标记他们属于哪个consumer group,每个消息被发送到topic后都会被传输到每个consumer group中一个consumer实例上面。consumer实例可以分布在不同的进程中或者分布在不同的机器上。
如果所有的consumer实例都在一个consumer group中,这种模式就像传统的queue一样,在所有的consumer之间平衡负载,如果所有的consumer实例分布在不同的consumer group中,这种模式就像publish-subscribe,所有的消息都会广播到所有的consumer上。
在更多的情况下,我们的topics中会包含几个consumer group,每个consumer group都是一个logical subscribe,每个group包含了多个consumer实例,这样更具扩展性和容错性。kafka有比传统消息系统更强健的订阅机制。
kafka在较高的层次给予了以下保证:
1 producers发送到topic partition中的消息会被顺序的追加到日志中。 2 consumer实例消费消息的顺序和消息在日志中的顺序一致 3 如果一个topic有N份复制,则我们可以允许N-1份服务故障,而不会丢失任何消息。1.2 Use Cases
以下是kafka的一些常用应用场景:
Messaging Website Activity Tracking Metrics Log Aggregation Stream Processing Event Sourcing Commit Log1.3 Quick Start
1: Download the code
tar -xzf kafka_2.9.2-0.8.1.1.tgz cd kafka_2.9.2-0.8.1.1
2: Start the server,启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh config/server.properties
3: Create a topic,创建一个topic,名为test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
列出已经创建的所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
4: Send some messages
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
5: Start a consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
6: Setting up a multi-broker cluster
在一台机器上运行三个kafka实例,构成一个kafka集群:
1 拷贝现有的配置文件,修改配置文件的broker.id、port、log.dir
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
2 启动其余两个节点
bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &
3 创建一个新的topic,replication是3
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
我们创建了集群,我们可以通过以下命令查看每个broker在做什么
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
4 发送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
5 消费一些消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic1.4 Ecosystem
值得一看,有很多有用的工具