RabbitMQ 简单使用
RabbitMQ是消息的代理,最主要的作用就是接收消息并转发,可以把它看成一个邮局,而你要发送的消息就像是信件,你只需要在信件上填上收件人地址,然后把信件扔进邮筒,邮递员就会为你邮递信件。
所以也可以这么说 RabbitMQ 就是一个邮箱、邮局、快递员的组合体。
现在我们来看看最简单的队列模式,包含
producer我们把产生消息的实体称为 producer ,比如说投递邮件的人,文中用 P 来表示。
queue存放消息的地方称为 queue ,例如邮筒或者邮箱。
consumer消息的消费者,例如邮递员,文中用 C 表示。
生产者将需要传递的消息存入队列,然后另一端的消费者使用该消息
Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.0</version> </dependency>
Gradle
dependencies { compile 'com.rabbitmq:amqp-client:4.0.0' }
我们创建一个 Send 类,该类连接到 RabbitMQ ,发送一条消息,然后退出
public class Send { private static final Logger LOG = LoggerFactory.getLogger(Send.class); private Channel channel; private Connection connection; private static final String QUEUE_NAME = "hello_world"; private static final String host = "localhost"; public Send() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try { this.connection = factory.newConnection(); this.channel = connection.createChannel(); } catch (IOException e) { LOG.error(e.getMessage(), e); } catch (TimeoutException e) { LOG.error(e.getMessage(), e); } } public void producer(String message) { try { channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { LOG.error(e.getMessage(), e); } } public void close() { try { this.channel.close(); this.connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, TimeoutException { Send send = new Send(); send.producer("Hello World"); send.close(); }
}
从队列中获取消息,会持有对队列消息的监听
public class Recv { private final static String QUEUE_NAME = "hello_world"; private static final String host = "localhost"; public static void consume() throws java.io.IOException, java.lang.InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { consume(); }
}
不出意外的话,会在控制台打印 [x] Received Hello World
先开启队列服务。
如果在队列服务器中不存在定义的队列名称,则会自动创建。
发送的消息如果没有被消费,会一直存在队列中,例如我们可以一直运行 Send ,会发现队列中的 Ready 消息会累加,当运行 Recv 后,消费掉消息才会进行清除。
Waring存在的问题:如果消息消费者在消费信息的过程中出现意外,( its channel is closed, connection is closed, or TCP connection is lost ),那么消息可能还没处理完,但是队列服务器中该消息已经不存在了,就相当于邮递员给你送信,但是送信的途中发生了意外,那么你的信就丢失了。
上面的队列存在问题,由于 ack == true
,当生产者发送的消息到达消费者端时,那么该条消息就立刻从内存中清除掉了,而不管你消费者是否已经成功处理,比如生产者发送了 100 条消息,当消费者连接上后,这 100 条消息就被从内存中清除,而消费者可能仅仅只成功处理了 10 条。
解决上述问题,我们需要在消息被成功处理后,手动告知队列,我们已经处理好了,现在可以清除了。如果处理的过程中失败了,那么这条消息会再次进行处理,直到成功处理为止。
更改上述代码
生产者将队列名称改成
private static final String QUEUE_NAME = "hello_world_ack_false";
其他保持不变。
消费者:
从队列中获取消息,会持有对队列消息的监听
public class Recv { private final static String QUEUE_NAME = "hello_world_ack_false"; private static final String host = "localhost"; public static void consume() throws java.io.IOException, java.lang.InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); //设置每个消费者同时只处理一条消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(envelope.getDeliveryTag(), false); // 在消息被正确处理后,手动 ack } }; channel.basicConsume(QUEUE_NAME, false, consumer);// 自动 ack 设置为 false } public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { consume(); }
}
然后再次进行运行,会发现,如果在消息处理阶段出现异常导致没处理成功时,在消息队列中,这条消息还将存在,标志位 Unacked,如下图。
以上我们就完成了 RabbitMQ 的最简单的使用,利用这个我们可以在 新用户注册时候发送邮件通知,或者秒杀活动修改订单数据等场景下使用。
当然 RabbitMQ 的更高级特性,例如 发布/订阅 ,Topic 我们以后有时间可以去了解,当然也不难的。
比如我们可以利用 Topic 搭建日志系统,将 error 输出在文件中,将 info 打印在控制台等等。
关于更高级点的功能,下次再一探究竟。
本文参照: RabbitMQ HelloWorld 详解