测试环境:ubuntu 15.10 64位
cpu:inter core i7-4790 3.60GHZ * 8
内存:16GB
硬盘:ssd 120GB
软件环境:rabbmitmq 3.6.0 kafka0.8.1 (均为单机本机运行)
测试结果:
kafka :消费速度: 37,586 /s 生产速度: 448,753 /s
rabbitmq: 消费速度: 20,807 /s 生产速度 16.413 /s
出现问题:
rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。
rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”
其他并无任何问题
结论:
很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。
代码:
kafka:
package main import ( "github.com/Shopify/sarama" "os" "os/signal" "sync" "log" "time" ) func main() { go producer() // go consumer() time.Sleep(10*time.Minute) } func producer() { config :=sarama.NewConfig() config.Producer.Return.Successes = true proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config) if err != nil { panic(err) } signals :=make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) var ( wg sync.WaitGroup enqueued, successes, errors int ) wg.Add(1) go func() { defer wg.Done() for _=range proder.Successes(){ successes++ } }() wg.Add(1) go func() { defer wg.Done() for err := range proder.Errors(){ log.Println(err) errors++ } }() go func() { t1 := time.NewTicker(time.Second) for{ <- t1.C log.Println(enqueued) } }() ProducerLoop: for{ message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")} select { case proder.Input() <- message: enqueued++ case <- signals: proder.AsyncClose() break ProducerLoop } } wg.Wait() log.Println("Successfully produced:%d;errors:%d/n",successes,errors) } func consumer() { coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil) if err != nil { panic(err) } defer func() { if err :=coner.Close(); err !=nil{ log.Fatalln(err) } }() partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest) if err != nil { panic(err) } defer func() { if err := partitionConsumer.Close();err!=nil{ log.Fatalln(err) } }() signals := make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) consumed:=0 go func() { t1 := time.NewTicker(time.Second) for{ <- t1.C log.Println(consumed) } }() ConsumerLoop: for{ select { case _ = <-partitionConsumer.Messages(): consumed++ // log.Println( string(msg.Value)," => ",consumed) case <-signals: break ConsumerLoop } } log.Printf("Consumed: %d/n", consumed) }
rabbitmq:
package main import ( "github.com/streadway/amqp" "time" "fmt" "log" ) const ( queueName = "push.msg.q" exchange = "t.msg.ex" mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push" ) var conn *amqp.Connection var channel *amqp.Channel func main() { fmt.Println(1) // push() receive() // fmt.Println("end") // close() } func failOnErr(err error, msg string) { if err != nil { log.Fatalf("%s:%s", msg, err) panic(fmt.Sprintf("%s:%s", msg, err)) } } func mqConnect() { var err error conn, err = amqp.Dial(mqurl) if err != nil { log.Println(1) log.Fatalln(err) } fmt.Println(5) channel, err = conn.Channel() if err != nil { fmt.Println(2) log.Fatalln(err) }else { fmt.Println("a") } } func push() { count := 0 if channel == nil { fmt.Println(2) mqConnect() }else { fmt.Println(3) } msgContent := "hello world!" t1 := time.NewTicker(time.Second) go func() { for{ <- t1.C log.Println(count) } }() for{ err := channel.Publish(exchange, "test", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgContent), }) if err != nil { }else { count ++ } } } func receive() { if channel == nil { mqConnect() } count :=0 msgs, err := channel.Consume(queueName, "", true, false, false, false, nil) failOnErr(err, "") forever := make(chan bool) t1 := time.NewTicker(time.Second) go func() { for{ <- t1.C log.Println(count) } }() go func() { //fmt.Println(*msgs) for _= range msgs { count ++ // s := BytesToString(&(d.Body)) // count++ // fmt.Printf("receve msg is :%s -- %d/n", *s, count) } }() fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C/n") <-forever }