ActiveMq是Apache提供的开源消息系统采用java实现,
很好地支持JMS(Java Message Service,即Java消息服务) 规范
ActiveMq安装: http://activemq.apache.org/co... 在官网下载安装对应的版本
下载完成后解压就可以使用
ActiveMq默认的端口号是8161,用户名和密码都是admin 在本机可以使用 http://localhost :8161 去访问
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
spring.activemq.broker-url=tcp://localhost:61616 #如果是点对点(queue),那么此处默认应该是false,如果发布订阅,那么一定设置为true spring.activemq.packages.trust-all=true spring.activemq.user=admin spring.activemq.password=admin
@Component public class QueueBean{ //创建一个队列实例 @Bean Queue queue(){ //这里设置的消息是队列的名称 return new ActiveMQQueue("hello.javaboy"); } }
@Component public class JmsComponent{ //springboot提供的消息模板 @Autowired JmsMessagingTemplate jmsMessagingTemplate; //自己创建的队列实例 @Autowired Queue queue; /** * 发送消息 * @param message */ public void send(Message message){ jmsMessagingTemplate.convertAndSend(this.queue,message); } /** * 接收消息 * @param message */ //表示监听该队列名称发来的消息 @JmsListener(destination = "hello.javaboy") public void readMessage(Message message){ System.out.println(message); } }
public class Message implements Serializable { private String content;//消息主体 private Date sendDate;//消息发送的时间 //省略get、set、tostring方法 }
在测试类中注入JmsComponent 调用send()方法进行消息的转发
@SpringBootTest class ActivemqApplicationTests { @Autowired JmsComponent jmsComponent; @Test void contextLoads() { Message message = new Message(); message.setContent("hello activeMq"); message.setSendDate(new Date()); jmsComponent.send(message); } }
首先启动项目,在运行测试类进行消息发送:
控制台会打印消息内容:
rabbitmq安装比较繁琐,这里使用docker容器进行安装,docker安装非常方便,一条命令全部搞定
通过docker安装rabbitmq
-P(大p)表示自动映射到主机端口
docker run -d --hostname my-rabbitmq --name some-rabbitmq -P rabbitmq:3-management
首先导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
编写配置文件:
#配置rabbitMQ spring.rabbitmq.host=localhost spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=32771
直连交换机:Direct exchange
扇形交换机:Fanout exchange
主体交换机:Topic exchange
首部交换机:Headers exchange
下面分别介绍4中交换模式:
//Direct策略(只转发给routingKey相匹配的用户) @Configuration public class RabbitDirectConfig { public final static String DIRECTNAME = "javaboy-direct"; //消息队列 @Bean Queue queue(){ //name值为队列名称,routingKey会与他进行匹配 return new Queue("hello.RabbitMQ"); } @Bean Queue queue1(){ return new Queue("hello.RabbitMQ1"); } @Bean DirectExchange directExchange(){ //第一个参数为DIRECTNAME、第二个参数表示重启后是否有效,第三参数表示长时间未使用是否删除 return new DirectExchange(DIRECTNAME,true,false); } @Bean Binding binding(){ //将队列queue和DirectExchange绑定在一起 return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); } @Bean Binding binding1(){ //将队列queue和DirectExchange绑定在一起 return BindingBuilder.bind(queue1()).to(directExchange()).with("direct"); } }
//配置消费者 @Component public class DirectReceiver { //只监听queue()队列的消息 @RabbitListener(queues = "hello.RabbitMQ") public void hanlder(String msg){ System.out.println("hanlder>>>"+msg); } //只监听queue1()队列的消息 @RabbitListener(queues = "hello.RabbitMQ1") public void hanlder1(String msg){ System.out.println("hanlder1>>>"+msg); } }
测试代码:
在springboot的测试类中注入RabbitTemplate(springboot提供的RabbitMQ模板)
@Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { //两个参数第一个是routingKey、第二个为消息内容 rabbitTemplate.convertAndSend("hello.RabbitMQ","hello RabbitMQ test"); rabbitTemplate.convertAndSend("hello.RabbitMQ1","hello RabbitMQ test222"); }
启动项目后,运行测试类可以看到只有与routingkey相匹配的消费者受到了对应的消息:
2、Fanout exchange
//Fanout策略(只要是与他绑定的队列,都会收到消息与routingKey无关) @Configuration public class RabbitFanoutConfig { public final static String FANOUTNAME = "javaboy-fanout"; //配置了两个消息队列queueOne和queueTwo @Bean Queue queueOne(){ return new Queue("queue-one"); } @Bean Queue queueTwo(){ return new Queue("queue-two"); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUTNAME,true,false); } //将两个队列与FanoutExchange绑定 @Bean Binding bindingOne(){ return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo(){ return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
//配置消费者 @Component public class FanoutReceiver { //两个消费者分别监听两个不同的队列 @RabbitListener(queues = "queue-one") public void hanlder1(String msg){ System.out.println("FanoutReceiver:hanlder1>>>"+msg); } @RabbitListener(queues = "queue-two") public void hanlder2(String msg){ System.out.println("FanoutReceiver:hanlder2>>>"+msg); } }
@Test void rabbitFanout(){ //三个参数表示RabbitFanoutConfig的名称、routingkey、消息内容 rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout test"); }
该方式与routingkey无关所有写null即可
查看输出:可以看到两个消费者都收到了消息
topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为.#. *为单词,#表示模糊匹配
例如routingkey为:xiaomi.# 那么带有xiaomi.开头的队列都会收到该消息
routingkey为:#.phone.# 表示消息的routingKey中带有phone时 就会去匹配带有phone的队列
/topic策略可以根据routingKey的规则(通配符方式)进行去匹配队列进行转发规则为*.#.* //*为单词,#表示模糊匹配 @Configuration public class RabbitTopicConfig { public final static String TOPICNAME = "javaboy-topic"; @Bean TopicExchange topicExchange(){ return new TopicExchange(TOPICNAME,true,false); } @Bean Queue xiaomi(){ return new Queue("xiaomi"); } @Bean Queue huawei(){ return new Queue("huawei"); } @Bean Queue phone(){ return new Queue("phone"); } @Bean Binding xiaomiBinding(){ //xiaomi.#:表示消息的routingKey是以xiaomi开头的就会路由到xiaomi的队列 return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#"); } @Bean Binding huaweiBinding(){ return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding phoneBinding(){ //#.phone.#:表示消息的routingKey中带phone的都会路由到phone的队列 return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); } }
@Component public class TopicReceiver { //分别监听名称为xiaomi、huawei、phone的队列 @RabbitListener(queues = "xiaomi") public void handlerXM(String msg){ System.out.println("TopicReceiver:handlerXM>>>"+msg); } @RabbitListener(queues = "huawei") public void handlerHW(String msg){ System.out.println("TopicReceiver:handlerHW>>>"+msg); } @RabbitListener(queues = "phone") public void handlerPHONE(String msg){ System.out.println("TopicReceiver:handlerPHONE>>>"+msg); } }
@Test void rabbitTopic(){ //根据匹配规则该消息只能被xiaomi的队列收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻"); //根据匹配规则该消息只能被phone的队列收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"vivo.phone","vivo手机"); //根据匹配规则该消息可以别huawei和phone两个队列收到 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机"); }
查看输出:
可以看到routingkey为huawei.phone的消息匹配了两个队列,其他两个都只匹配了一个队列
该模式是根据路由规则的header进行匹配的,在进行匹配的时候需要传入一个map集合,routingkey去匹配map即可中的key value,匹配规则可以使any或者all,any表示只要包含任意信息就可以,all表示所有信息都必须匹配
@Configuration public class RabbitHeaderConfig { public final static String HEADERNAME = "javaboy-header"; @Bean HeadersExchange headersExchange(){ return new HeadersExchange(HEADERNAME,true,false); } //分别创建两个不同header的队列 @Bean Queue queueName(){ return new Queue("name-queue"); } @Bean Queue queueAge(){ return new Queue("age-queue"); } @Bean Binding bindingName(){ Map<String,Object> map = new HashMap<>(); map.put("name","hello"); //表示如果routingKey匹配的map集合中的key value 就会将消息转发到对应的路由上 return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge(){ return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists(); } }
@Component public class HeaderReceiver { @RabbitListener(queues = "name-queue") public void handlerName(byte[] msg){ System.out.println("HeaderReceiver:handlerName>>>>"+new String(msg,0,msg.length)); } @RabbitListener(queues = "age-queue") public void handlerAge(byte[] msg){ System.out.println("HeaderReceiver:handlerAge>>>>"+new String(msg,0,msg.length)); } }
@Test public void rabbitHeader(){ //设置消息,并且设置header,setHeader("name","hello")分别表示map集合中的key、value Message nameMessage = MessageBuilder.withBody("hello name".getBytes()).setHeader("name","hello").build(); Message ageMessage = MessageBuilder.withBody("hello 99 age".getBytes()).setHeader("age","99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,nameMessage); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,ageMessage); }
查看输出:
改变setheader中的值查看结果:
Message nameMessage = MessageBuilder.withBody("hello name".getBytes()).setHeader("name","javaboy").build();
可以看到因为key、value匹配不上只打印了一条消息。
大家看完有什么不懂的可以在下方留言讨论,也可以关注我私信问我,我看到后都会回答的。也欢迎大家关注我的公众号:前程有光,金三银四跳槽面试季,整理了1000多道将近500多页pdf文档的Java面试题资料,文章都会在里面更新,整理的资料也会放在里面。谢谢你的观看,觉得文章对你有帮助的话记得关注我点个赞支持一下!