本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。 我们的消费端进行监听RabbitMQ,当发现队列中有消息后,就进行消费。
本次整合主要采用SpringBoot框架,需要对SpringBoot的使用有一定了解。
我们来看下大概步骤:
这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。 Channel是我们RabbitMQ所有消息进行交互的关键。
/** * * @ClassName: ConnectionUtils * @Description: 连接工具类 * @author Coder编程 * @date 2019年6月21日 上午22:28:22 * */ public class ConnectionUtils { public static Connection getConnection() throws IOException, TimeoutException { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("127.0.0.1"); //端口 factory.setPort(5672);//amqp协议 端口 类似与mysql的3306 //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/vhost_cp"); factory.setUsername("user_cp"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } } 复制代码
/** * * @ClassName: Producer * @Description: 生产者 * @author Coder编程 * @date 2019年7月30日 上午21:04:43 * */ public class Producer { public static void main(String[] args) throws Exception { System.out.println("Producer start..."); //1 创建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 通过connection创建一个Channel Channel channel = connection.createChannel(); //3 通过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //4 记得要关闭相关的连接 channel.close(); connection.close(); } } 复制代码
/** * * @ClassName: Consumer * @Description: 消费端 * @author Coder编程 * @date 2019年7月30日 上午21:08:12 * */ public class Consumer { public static void main(String[] args) throws Exception { System.out.println("Consumer start..."); //1 创建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2通过connection创建一个Channel Channel channel = connection.createChannel(); //3声明(创建)一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //4创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5设置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //6 获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } } 复制代码
channel.queueDeclare(queueName, true, false, false, null); 复制代码
第一个参数:queuename:队列的名称 第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失 第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。 第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除 第五个参数:arguments:扩展参数
channel.basicConsume(QUEUE_NAME, true, consumer); 复制代码
第二个参数 autoAck:自动签收消息
(1)启动消费端
(2)查看管控台
可以看到已经有一个连接,一个信道,一个消费者等信息了。
可以看到信道目前的状态是 空闲状态。
队列中多了test001队列。
关于管控台的介绍可以看这篇文章:
(3)运行生产端
可以看到生产端发送完消息之后停下了,消费端迅速接收到了消息。也可以继续通过管控台观察消费的情况。
(4) 问题
这里面可能有一个问题:为什么要先启动消费端呢?
因为在消费端创建的队列,我们必须要有队列,才能够发送消息。
另一个问题:在生产端代码中:
channel.basicPublish("", "test001", null, msg.getBytes()); 复制代码
并没有设置exchange,只设置了队列名称,消费端却依然能够消费到消息,这是为什么呢?
答:发消息的一定要指定Exchange,如果不指定Exchange或者Exchange为空的话,它会默认走第一个
它的路由规则:将相同命名的队列Queue的消息路由过去,如果路由不过去,将会把消息删除。
欢迎关注个人微信公众号: Coder编程 获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识! 新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。
文章收录至 Github: github.com/CoderMerlin… Gitee: gitee.com/573059382/c… 欢迎 关注 并star~
参考文章:
www.cnblogs.com/myJavaEE/p/…
《RabbitMQ消息中间件精讲》