转载

[RabbitMQ]队列

RabbitMQ是一个message broker,本质上就是接收producer的消息,传递给consumer,其中可以根据给定的需要设置消息路由、缓存、持久化。

简单队列

最简单的队列如下图,producer将消息推送到queue中,queue将消息传递给consumer。可以有多个producer向同一个queue推送消息,也可以有多个consumer从queue接收消息。

[RabbitMQ]队列

用Java开发producer和consumer需要下载 Java client library 。

producer代码如下:

// Send.java  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;  public class Send {    private final static String QUEUE_NAME = "SimpleQueue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("127.0.0.1");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();      channel.queueDeclare(QUEUE_NAME, false, false, false, null);     String message = "Hello World!";     channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));     System.out.println(" [x] Sent '" + message + "'");      channel.close();     connection.close();   } }

consumer代码如下:

// Recv.java  import com.rabbitmq.client.*;  import java.io.IOException;  public class Recv {    private final static String QUEUE_NAME = "SimpleQueue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("127.0.0.1");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();      channel.queueDeclare(QUEUE_NAME, false, false, false, null);     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");      Consumer consumer = new DefaultConsumer(channel) {       @Override       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)           throws IOException {         String message = new String(body, "UTF-8");         System.out.println(" [x] Received '" + message + "'");       }     };     channel.basicConsume(QUEUE_NAME, true, consumer);   } }

编译并运行:

javac -cp rabbitmq-client.jar Send.java Recv.java  java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send  java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

运行结果如下:

[RabbitMQ]队列

任务队列(task quque)

任务队列是queue将消息分发给多个consumer进行处理。

[RabbitMQ]队列

// Task.java  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;  public class Task {    private final static String QUEUE_NAME = "TaskQueue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("127.0.0.1");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();      channel.queueDeclare(QUEUE_NAME, false, false, false, null);     String message = argv[0];     channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));     System.out.println(" [x] Sent '" + message + "'");      channel.close();     connection.close();   } }
// Worker.java  import com.rabbitmq.client.*;  import java.io.IOException;  public class Worker {    private final static String QUEUE_NAME = "TaskQueue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("127.0.0.1");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();      channel.queueDeclare(QUEUE_NAME, false, false, false, null);     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");      Consumer consumer = new DefaultConsumer(channel) {       @Override       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)           throws IOException {         String message = new String(body, "UTF-8");         System.out.println(" [x] Received '" + message + "'");          try {           doWork(message);         } finally {           System.out.println(" [x] Done");         }       }     };     channel.basicConsume(QUEUE_NAME, true, consumer);   }    private static void doWork(String task) throws InterruptedException {     try {       Thread.sleep(1000);     } catch (InterruptedException _ignored) {       Thread.currentThread().interrupt();     }   } }

编译代码:

javac -cp rabbitmq-client.jar Task.java Worker.java

分别打开三个console,其中两个运行Worker:

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker

另一个console运行Task发送消息:

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Task msg

运行结果如下:

[RabbitMQ]队列

[RabbitMQ]队列

[RabbitMQ]队列

使用任务队列可以实现并行任务处理。默认情况下,RabbitMQ会按消息的顺序依次分发给consumer,也就是说平均每个consumer会接收相同数量的消息。这种消息分发方式称为循环调度(round-robin dispatching)。

消息反馈(message acknowledgment)

如果consumer在执行某个消息的任务过程中失败了,会怎么样呢?在当前模式下,RabbitMQ一旦将消息发送给了consumer就从内存中删除该消息,这种情况下,如果杀掉了worker,那么分配给该worker的未处理的和正在处理中的消息就会丢失。但是我们肯定不希望任务丢失,我们希望一个worker挂掉之后,它未处理完成的任务将分配给其他worker处理。

为了确保消息不会丢失,RabbitMQ支持消息反馈。消息反馈就是consumer告诉RabbitMQ某个消息已经被接收并处理完成了,可以从队列中删除了。

如果一个consumer挂掉了(channel或connection被关闭,或者TCP连接丢失),没有发送反馈(ack),那么RabbitMQ就知道消息没有被处理完成,就把它重新放入队列。这时如果有其他consumer在线,那么就把这个消息重新分发。这样就确保在worker挂掉时消息也不会丢失了。

消息处理是没有超时机制的。RabbitMQ会在consumer挂掉时才重新分发消息。因此,即使消息处理时间很长很长都没有关系。

默认消息反馈是开启的。在上面的例子中通过指定 autoAck=true 关闭了反馈。消息反馈的代码如下:

final Consumer consumer = new DefaultConsumer(channel) {   @Override   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {     String message = new String(body, "UTF-8");      System.out.println(" [x] Received '" + message + "'");     try {       doWork(message);     } finally {       System.out.println(" [x] Done");       channel.basicAck(envelope.getDeliveryTag(), false);     }   } };

PS:可以使用rabbitmqctl查看未反馈的消息。

rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化

之前的消息反馈可以确保consumer挂掉时消息不会丢失,但是如果RabbitMQ服务器停止或者崩溃了,队列和其中的消息就都丢失了。要确保服务器上的消息不丢失,需要将队列和消息都标识成持久化的。

首先,确保RabbitMQ不会丢失队列:

boolean durable = true; channel.queueDeclare("TaskQueue", durable, false, false, null);

尽管这代码是正确的,但是目前的环境下却不起作用。因为之前已经声明了一个非持久化的队列TaskQueue。RabbitMQ不允许用不同的参数重新定义已经存在的队列。换一个新的队列名称就行了。

boolean durable = true; channel.queueDeclare("DurableTaskQueue", durable, false, false, null);

这样RabbitMQ重启后队列DurableTaskQueue是仍然存在的。接下来需要将消息标识成持久化的,通过设置属性MessageProperties为PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;  channel.basicPublish("", "task_queue",             MessageProperties.PERSISTENT_TEXT_PLAIN,             message.getBytes());

PS:将消息标识成持久化不能完全保证消息不会丢失。RabbitMQ在接收到消息写入磁盘之间仍有很短的时间间隔,而且RabbitMQ不会每次保存消息时都调用fsync,因此可能消息只被写入了缓存而还没有写入磁盘。如果需要更强的持久化方案,可以使用publisher confirm。

公平分配(fair dispatch)

默认的循环调度方案可能会造成负载不平均。例如,有两个worker,偶数号的任务处理很耗时,而奇数号的任务处理很快,那么就会导致一个worker处理很慢,积压很多任务,而另一个worker处理很快,比较空闲。这是因为RabbitMQ在消息进入队列后就立刻分发出去,它不关心consumer中未反馈的消息数量,只是简单地将第n个消息分发给第n个consumer。

为了让每个consumer的负载均匀,通过basicQos方法设置参数prefetchCount为1。这样,consumer中的消息将不超过一条,当consumer处理完并发送反馈后RabbitMQ才会向它分发新的消息。RabbitMQ在接收到新消息后,如果有空闲的consumer就将消息发给它,否则就一直等待有空闲的consumer再分发。

int prefetchCount = 1; channel.basicQos(prefetchCount);

PS:如果所有的任务都很耗时,worker都很忙,那么可能会导致队列被塞满,这时需要增加worker,或者采用其他策略。

最后的完整代码如下:

// Task.java  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;  public class Task {    private static final String TASK_QUEUE_NAME = "task_queue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     Connection connection = factory.newConnection();     Channel channel = connection.createChannel();      channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);      String message = argv[0];      channel.basicPublish("", TASK_QUEUE_NAME,         MessageProperties.PERSISTENT_TEXT_PLAIN,         message.getBytes("UTF-8"));     System.out.println(" [x] Sent '" + message + "'");      channel.close();     connection.close();   }
// Worker.java  import com.rabbitmq.client.*;  import java.io.IOException;  public class Worker {    private static final String TASK_QUEUE_NAME = "task_queue";    public static void main(String[] argv) throws Exception {     ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");     final Connection connection = factory.newConnection();     final Channel channel = connection.createChannel();      channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");      int prefetchCount = 1;     channel.basicQos(prefetchCount);      final Consumer consumer = new DefaultConsumer(channel) {       @Override       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {         String message = new String(body, "UTF-8");          System.out.println(" [x] Received '" + message + "'");         try {           doWork(message);         } finally {           System.out.println(" [x] Done");           channel.basicAck(envelope.getDeliveryTag(), false);         }       }     };     channel.basicConsume(TASK_QUEUE_NAME, false, consumer);   }    private static void doWork(String task) {     try {       Thread.sleep(1000);     } catch (InterruptedException _ignored) {       Thread.currentThread().interrupt();     }   } }

参考资料:

RabbitMQ Introduction

RabbtMQ Work Queues
原文  http://www.cnblogs.com/w1991/p/5178701.html
正文到此结束
Loading...