在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态创建交换机、队列、交换机和队列的绑定关系,让我们无需进行重复的编码工作。
当发送消息发送至“fanout”类型交换机时,RabbitMQ将会忽略路由键,直接将消息发送至该交换机下所有队列。从而被每一个队列的消费者进行消费。
当发送消息发送至“direct”类型交换机时,RabbitMQ将会查找与传入路由键完全匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
当发送消息发送至“topic”类型交换机时,RabbitMQ将会查找与传入路由键规则匹配的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
当发送消息发送至“headers”类型交换机时,RabbitMQ将会忽略路由键,根据在绑定路由器及队列时设置的完全匹配或部分匹配作为依据,以消息内容中的headers属性对队列进行匹配,匹配成功的队列才可以收到该消息从而进一步被消费者消费。
manual模式(手动ACK)下,注意一定要进行消息的确认。如果忘记了ACK,那么后果很严重。当consumer退出时候,message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
说明:RabbitMQ交换器类型枚举
作用:声明了交换器的常用类型
操作方式:无需操作
备注:无
说明:RabbitMQ交换器枚举
作用:声明了项目需要使用到的交换机
操作方式: 按需增删
备注:由于fanout类型的交换机会忽略路由键直接下发所有的路由器,故每个不同的广播都应新建一个交换机
说明:队列参数
作用:声明了在创建队列时使用到的参数Map
操作方式: 按需修改
备注:无
说明:队列头信息
作用:声明了ExchangeTypeEnum.headers类型的交换机所使用的匹配信息
操作方式: 按需修改
备注:无
说明:队列配置枚举
作用:声明了项目需要使用的队列
操作方式: 无需操作
备注:所有指定为ExchangeEnum.fanout的队列,都将在队列名称后面追加“.10_87_10_*”(数字部分为服务所在IP地址)
说明:RabbitMQ核心配置类
作用:实现动态创建交换机、动态创建队列、动态绑定
操作方式: 无需操作
备注:该类已将所有队列名称以Map形式注册至spring bean工厂,bean名称“queuesNames”,结构:Map<String, String>(Map<队列配置枚举名称, 队列名称>) 消费者可直接使用@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})进行监听
说明:默认回调类,实现自MQCallback
作用:消息发送成功或失败的回调类
操作方式: 无需操作
备注:可手动进行实现,调用queueMessageServiceImpl.setMQCallback(mqCallback)方法即可实现修改
说明:默认消息发送类,实现自QueueMessageService
作用:实现了消息发送的基本接口
操作方式: 无需操作
备注:无
public static <T> boolean registerBean(String beanName, T bean) { // 将applicationContext转换为ConfigurableApplicationContext ConfigurableApplicationContext context = (ConfigurableApplicationContext) SpringContextUtil.getApplicationContext(); // 将bean对象注册到bean工厂 context.getBeanFactory().registerSingleton(beanName, bean); log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean); return true; } 复制代码
@Bean("queuesNames") public Map<String, String> queuesNames() { return QueueEnum.getQueuesNames(); } // 这个方法在QueueEnum public static Map<String, String> getQueuesNames() { return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName())); } 复制代码
/** * @return java.lang.Object * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange(); * @description 动态创建交换机 * @param: this is no-parameter method. * @author zhiqiang94@vip.qq.com * @create 2020/4/16 0016 9:28 */ @Bean("createExchange") public Object createExchange() { // 遍历交换机枚举 ExchangeEnum.toList().forEach(exchangeEnum -> { // 声明交换机 Exchange exchange; // 根据交换机模式 生成不同的交换机 switch (exchangeEnum.getType()) { case fanout: exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build(); break; case topic: exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build(); break; case headers: exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build(); break; case direct: default: exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build(); break; } // 将交换机注册到spring bean工厂 让spring实现交换机的管理 if (exchange != null) { SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange); } } ); // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂 return null; } 复制代码
/** * @return java.lang.Object * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue(); * @description 动态创建队列 * @param: this is no-parameter method. * @author zhiqiang94@vip.qq.com * @create 2020/4/16 0016 9:29 */ @Bean("createQueue") public Object createQueue() { // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理 QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()))); // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂 return null; } 复制代码
/** * @return java.lang.Object * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding(); * @description 动态将交换机及队列绑定 * @param: this is no-parameter method. * @author zhiqiang94@vip.qq.com * @create 2020/4/16 0016 9:29 */ @Bean("createBinding") public Object createBinding() { // 遍历队列枚举 将队列绑定到指定交换机 QueueEnum.toList().forEach(queueEnum -> { // 从spring bean工厂中获取队列对象(刚才注册的) Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class); // 声明绑定关系 Binding binding; // 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则 switch (queueEnum.getExchangeEnum().getType()) { case fanout: FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class); binding = BindingBuilder.bind(queue).to(fanoutExchange); break; case topic: TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class); binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey()); break; case headers: HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class); if (queueEnum.isWhereAll()) { // whereAll表示全部匹配 binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match(); } else { // whereAny表示部分匹配 binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match(); } break; case direct: default: DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class); binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey()); break; } // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理 if (binding != null) { SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding); } } ); // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂 return null; } 复制代码
当消费者方法为void方法时,QueueMessageService.convertSendAndReceive方法返回值为null,注意检查消费者监听方法。
当QueueMessageService.convertSendAndReceive发送的消息被多个消费者消费,返回的结果为第一个消费者的返回值。建议有返回值的队列使用直连模式(direct),这样能保证该消息只有一个消费者进行消费。
广播交换机的角色类似于村里面的小喇叭,当有新的政策传达给这个村时,就会使用小喇叭播放,这样每个人(队列)都停到了内容。
可用于发送类似公告、系统全局通知等广播需求。
直连交换机的角色类似于邮递员,信封上写明了收件地址,收到的消息会根据目的地的不同投入到不同的队列中,若是邮递员没有找到这个地址,这封信就被丢弃了。
可用于点对点之间的通讯。
通配符交换机类似于脑子不聪明的邮件分发员,收到有一封送往“张家镇李家村12号”的信件,分发员来到分发点,发现有两个有污渍的邮箱(队列),一个能看清*李家村*,一个能看清张家镇*,快递员干脆就将信件复制了一份,往两个地方都送了投递了一份。
可用于同一个任务,需要不同模块或者分布式系统的模块同时执行时使用。
头交换机类似于程序中的鉴权,管理员拥有全部权限,用户拥有查看权限,用户无法调用需要管理员权限的接口(消息无法分发到未监听该header的队列),拥有相应权限才能调用相应权限的接口(消息分发到监听该header的队列)。这里需要注意,头交换机支持完全匹配和部分匹配,完全匹配为管理员接口必须拥有所有管理员权限才可调用,即拥有所有管理员权限才显示进入管理员后台的按钮。部分匹配为拥有其中一个或部分权限即可调用,即拥有任意管理员权限就显示进入管理员后台的按钮。