注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。
在上一篇文章 RabbitMQ 基础教程(1) - Hello World 中,我们已经简单的介绍了 RabbitMQ
以及如何发送和接收一个消息。接下来我们将继续深入 RabbitMQ
,研究一下 消息队列(Work Queue)
消息的 发布者 发布一个消息到 消息队列 中,然后信息的 消费者 取出消息进行消费。
queue +-------------+ +--+--+--+--+--+--+ +-------------+ | producer |----->|m1|m2| ... | | |---->| consumer | +-------------+ +--+--+--+--+--+--+ +-------------+
但是实际情况往往比这个要复杂,假如我们有多个信息的发布者和多个信息的消费者,那 RabbitMQ
又将会是怎么工作呢?
+--------------+ +--------------+ | producer1 +- / | consumer1 | +--------------+ /- queue /- +--------------+ +--------------+ /- +---+---+---+----+ /- +--------------+ | producer2 +---->X|m1 |m2 |m3 |... |/---->| consumer2 | +--------------+ /- +---+---+---+----+ /- +--------------+ +--------------+ /- /- +--------------+ | ... |/ / | ... | +--------------+ +--------------+
RabbitMQ
中,如果有多个消费者同时消费同一个消息队列,那么就通过 Round-robin
算法将消息队列中的消息均匀的分配给每一个消费者。
这个算法其实很简单,每收到一个新的消息,就将这个消息分发给上下一个消费者。比如上一个消费者是 consumer-n
,那么有新消息来的时候就将这个新的消息发布到 consumer-n+1
,以此类推,如果到了最后一个消费者,那么就又从第一个开始。即: consumer-index = (consumer-index + 1) mod consumer-number
为了演示,首先来做几项准备工作。
定义任务 task.js
/** * 创建一个任务 * @param taskName 任务名字 * @param costTime 任务话费的时间 * @param callback 任务结束以后的回调函数 * @constructor */ function Task(taskName ,costTime , callback){ if(typeof(costTime) !== 'number') costTime = 0; // no delay there setTimeout(function () { console.log(taskName+" finished"); if(callback && typeof (callback) === 'function') callback(); } , 1000*costTime); };
任务发布者负责将该结构发布到队列中,然后消费者取出消息,新建任务开始执行。
{ taskName : 'taskname', costTime : 1 }
创建任务消息 task-producer.js
var amqp = require('amqplib/callback_api'); // 连接上RabbitMQ服务器 amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'tasks'; // 得到发送消息的数目,默认发送4个 var name; var cost; (function () { if(process.argv.length < 4 ) { console.error('ERROR : usage - node rabbit-producer <taskname> <costtime>'); process.exit(-1); } name = process.argv[2]; cost = +process.argv[3]; })(); // 新建队列,然后将队列中的消息持久化取消 ch.assertQueue(q, {durable: true}); // 将任务串行化存入Buffer中,并推入队列 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true}); console.log(" [x] Sent "+name); setTimeout(function () { process.exit(0); },500); }); });
消费任务消息 task-consumer.js
var amqp = require('amqplib/callback_api'); var Task = require('./task.js'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'tasks'; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString('utf8')); console.log('Get the task '+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime); }, {noAck: true}); }); });
现在开启两个消费者进程来等待消费 tasks
队列中的消息
# shell1 node task-consumer.js # shell2 node task-consumer.js
然后向队列中推入三个消息
# shell3 node task-producer.js task1 0 node task-producer.js task2 0 node task-producer.js task3 0
运行结果
# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 已经通过Round-robin算法将消息队列中的消息分配到连接的消费者中了.
细心的读者可能已经发现了我们在 声明队列 和 发送消息 的代码块中改动了一小部分的代码,那就是
// 声明队列 ch.assertQueue(q, {durable: true}); // 发送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
通过将队列的 durable
配置参数生命为 true
可以保证在 RabbitMQ
服务器退出或者异常终止的情况下不会丢失 消息队列 ,注意这里只是不会丢失消息队列,并不是消息队列中没有被消费的 消息 不会丢失。
为了保证消息队列中的 消息 不会丢失,就需要在发送消息时指定 persistent
选项,这里并不能百分之百的保证消息不会丢失,因为从队列中有新的消息,到将队列中消息持久化到磁盘这一段时间之内是无法保证的。
现在存在这样一种场景,消费者取到消息,然后创建任务开始执行。但是任务执行到一半就抛出异常,那么这个任务算是没有被成功执行的。
在我们之前的代码实现中,都是消息队列中有新的消息,马上就这个消息分配给消费者消费,不管消费者对消息处理结果如何,消息队列会马上将已经分配的消息从消息队列中删除。如果这个任务非常重要,或者一定要执行成功,那么一旦任务在执行过程中抛出异常,那么这个任务就再也找不回来了,这是非常可怕的事情。
还好在 RabbitMQ
中我们可以为已经分配的消息和消息队列之间创建一个应答关系:
要在消费者和消息队列之间建立这种应答关系我们只需要将 channel
的 consume
函数的 noAck
参数设成 false
就可以了。
ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString('utf8')); console.log('Get the task '+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime); }, {noAck: false}); // 这里设置成false
下面我们就模拟一下消息处理失败的场景:
var amqp = require('amqplib/callback_api'); var Task = require('./task.js'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'tasks'; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString('utf8')); console.log('Get the task '+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime,function(){ if(obj.taskName === 'task2') throw new Error("Test error"); else ch.ack(msg); }); // 如果是任务二,那么就抛出异常。 }, {noAck: false}); }); });
按照上面的脚本执行顺序,我们在执行一遍脚本: consumer2
得到执行 task2
消息,然后马上抛出异常退出进行,然后消息队列再将这个消息分配给 cosumer1
,接着也执行失败了,退出进程,最终消息队列中将只会有一个 task2
的消息存在。
启动消费者等待消息
# shell1 开启消费者1 node rabbit-consumer.js # shell2 开启消费者2 node rabbit-consumer.js
创建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 10 node rabbit-producer.js task3 0
我们能来看一下结果:
# shell2 消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 消费者2执行任务2的时候抛出异常,task2将会重新发送给消费者1 ... throw new Error('Error test'); # shell1 消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished Get the task task2 # 消费者1接收到任何2 task2 finished ... throw new Error('Error test'); # 也抛出异常了
最终会在消息队列中剩下一条未消费的信息。
这里有一点需要注意,如果你将 noAck
选项设置成了 false
,那么如果消息处理成功,一定要进行应答,负责消息队列中的消息会越来越多,直到撑爆内存。
在上文中我们听到过消息队列通过 Round-robin
算法来将消息分配给消费者,但是这个分配过程是盲目的。比如现在有两个消费者, consumer1
和 consumer2
,按照 Round-robin
算法就会将 奇数 编号的任务发配给 consumer1
,将 偶数 编号的任务分配给 consumer2
,但是这些任务恰好有一个特性, 奇数 编号的任务比较繁重,而 偶数 编号的任务就比较简单。
那么这就会造成一个问题,那就是 consumer1
会被累死,而 consumer2
会被闲死。造成了负载不均衡。要是每一个消息都被成功消费以后告诉消息队列,然后消息队列再将新的消息分配给空闲下来的消费者不就好了。
RabbitMQ
中的确有这样的一个配置选项。那就是 ch.prefetch(1);
我们现在就来模拟一下
var amqp = require('amqplib/callback_api'); var Task = require('./task.js'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'tasks'; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.prefetch(1); // 添加这一行 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString('utf8')); console.log('Get the task '+obj.taskName); new Task(obj.taskName,obj.costTime ,function () { ch.ack(msg); }); }, {noAck: false}); }); });
启动消费者等待消息
# shell1 开启消费者1 node rabbit-consumer.js # shell2 开启消费者2 node rabbit-consumer.js
创建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 20 node rabbit-producer.js task3 0 node rabbit-producer.js task4 20
# shell1 开启消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任务马上结束 task1 finished Get the task task3 # 任务马上结束 task3 finished Get the task task4 # 任务四被分配到consumer1中了 task4 finished # shell2 开启消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished