在这篇文章 Spring Boot系列十三 Spring Boot集成RabbitMQ 中,我们介绍了在Spring Boot如何使用RabbitMQ,本篇文章中,从源码中分析Spring Boot如何集成RabbitMQ。
在spring-boot-autoconfigure.jar中的spring.factories中有如下定义,表示spring启动时,会执行RabbitAutoConfiguration的初始化
… # Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,/ …
application_*.yml属性文件
spring: # 配置rabbitMQspring: rabbitmq: host: 10.240.80.134 username: spring-boot password: spring-boot virtual-host: spring-boot-vhost
以上的属性文件会被注入到RabbitProperties属性
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { … }
这是一个配置类,在启动时会初始化上面提到RabbitProperties对象,然后它会引入另一个配置类RabbitAnnotationDrivenConfiguration,这个配置类和消息监听有关我们后面再介绍
这个类有3个内部类,且都是配置类,这此配置类会根据条件初始RabbitMQ所需要的类
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) // 会初始化RabbitProperties.class @EnableConfigurationProperties(RabbitProperties.class) // 引入@Configuration类RabbitAnnotationDrivenConfiguration @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { … }
内部类RabbitConnectionFactoryCreator会根据RabbitProperties 配置的参数初始CachingConnectionFactory 实例(它是ConnectionFactory 子类),这个实例是连接RabbitMQ的连接池。
CachingConnectionFactory实例是对RabbitMQ官方提供对com.rabbitmq.client.ConnectionFactory和com.rabbitmq.client.Channel的封装,缓存这两种资源。CachingConnectionFactory有两种缓存模式
1. 如果选择CacheMode#CHANNEL的缓存模式,当我们调用 createConnection()方法时,每次返回相同的Connection。默认情况下,只创建一个Connection,只创建一个Channel(通过配置创建Channel数量参数,可以创建缓存多个Channel)。即可以创建多个Channel,但是所有的Channel共用同一个Connection
2. 如果选择CacheMode#CONNECTION的缓存模式,可以同时配置创建Connection的数量和Channel数据。当调用 createConnection()时,从缓存中获取可用Connection,如果没有且创建的数量没有达到上限,则创建新的Connection。同理Channel
@Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) throws Exception { // 根据RabbitProperties 配置RabbitMQ的连接工厂类 RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); if (config.determineHost() != null) { factory.setHost(config.determineHost()); } … factory.afterPropertiesSet(); // 连接缓存类 CachingConnectionFactory connectionFactory = new CachingConnectionFactory( factory.getObject()); connectionFactory.setAddresses(config.determineAddresses()); connectionFactory.setPublisherConfirms(config.isPublisherConfirms()); connectionFactory.setPublisherReturns(config.isPublisherReturns()); … return connectionFactory; } }
内部类RabbitTemplateConfiguration通过类的构造器将RabbitProperties 配置的参数、MessageConverter赋值到类的相应的成员变量上,然后在方法rabbitTemplate()根据RabbitConnectionFactoryCreator创建的CachingConnectionFactory实例 ,创建出RabbitTemplate和RabbitAdmin。
@Configuration // 引入RabbitConnectionFactoryCreator @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { private final ObjectProvider<MessageConverter> messageConverter; private final RabbitProperties properties; // 注入MessageConverter和RabbitProperties public RabbitTemplateConfiguration( ObjectProvider<MessageConverter> messageConverter, RabbitProperties properties) { this.messageConverter = messageConverter; this.properties = properties; } // 初始化RabbitTemplate @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { // 创建RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = this.messageConverter.getIfUnique(); if (messageConverter != null) { // 配置MessageConverter rabbitTemplate.setMessageConverter(messageConverter); } // 其它参数配置略 … return rabbitTemplate; } // 初始化AmqpAdmin @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { // 创建RabbitAdmin return new RabbitAdmin(connectionFactory); } }
内部配置类:MessagingTemplateConfiguration
通过rabbitMessagingTemplate()方法将上面创建的RabbitTemplate 实例注入并创建RabbitMessagingTempla
@Configuration @ConditionalOnClass(RabbitMessagingTemplate.class) @ConditionalOnMissingBean(RabbitMessagingTemplate.class) // 引入RabbitTemplateConfiguration配置类 @Import(RabbitTemplateConfiguration.class) protected static class MessagingTemplateConfiguration { // 生成实例RabbitMessagingTemplate, 其中RabbitTemplate 由RabbitTemplateConfiguration实例化 @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate( RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate); } }
通过以上配置就完成的RabbitMQ发送者相关的bean初始化,我们可以使用RabbitTemplate和RabbitAdmin发送消息。如果要监听RabbitMQ消息还需要如下配置,这个配置更加更复杂
此类RabbitAutoConfiguration中引入此类,此类会创建监听消息相关的Bean。我们来详细分析这个类。
传入监控需要MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例,做为的类的成员变量
@Configuration @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<MessageRecoverer> messageRecoverer; private final RabbitProperties properties; RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter, ObjectProvider<MessageRecoverer> messageRecoverer, RabbitProperties properties) { this.messageConverter = messageConverter; this.messageRecoverer = messageRecoverer; this.properties = properties; } … }
创建SimpleRabbitListenerContainerFactoryConfigurer 对象,此类保存创建RabbitListenerContainer所需要的MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例
// 实例SimpleRabbitListenerContainerFactoryConfigurer 对象,设置MessageConverter、MessageRecovere、RabbitMQ的属性 @Bean @ConditionalOnMissingBean public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setRabbitProperties(this.properties); return configurer; }
创建实例SimpleRabbitListenerContainerFactory (是RabbitListenerContainerFactory的子类),其中SimpleRabbitListenerContainerFactoryConfigurer 来自下面的方法,ConnectionFactory 来自RabbitAutoConfiguration,上面已经解释过了
@Bean @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }
这个内部类主要看他的@EnableRabbit注解,这个注解会使用RabbitListenerContainer参数,并创建其他相关的Bean实例,并进行监听消息。下节详细介绍@EnableRabbit
@EnableRabbit @ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableRabbitConfiguration { }
引入配置类RabbitBootstrapConfiguration
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented // 引入配置类RabbitBootstrapConfiguration @Import(RabbitBootstrapConfiguration.class) public @interface EnableRabbit { }
在这个配置类创建RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry。
@Configuration public class RabbitBootstrapConfiguration { // 创建RabbitListenerAnnotationBeanPostProcessor ,@RabbitListener+@RabbitHandler注解的方法,当收到监听消息分发到这些方法进行处理 @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() { return new RabbitListenerAnnotationBeanPostProcessor(); } // 创建RabbitListenerEndpointRegistry,供监听节点的注册 @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() { return new RabbitListenerEndpointRegistry(); } }
继承BeanPostProcessor,在Spring创建对象后,会拦截所有的被@RabbitListener+@RabbitHandler注解的方法
类在实例化时,会执行初始化,重要操作
1. 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
2. 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
3. 调用RabbitListenerEndpointRegistrar.afterPropertiesSet()进行初始化,这个方法内容后面再介绍
// 创建实例 private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar(); @Override public void afterSingletonsInstantiated() { … // 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中 if (this.registrar.getEndpointRegistry() == null) { if (this.endpointRegistry == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name"); this.endpointRegistry = this.beanFactory.getBean( RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class); } this.registrar.setEndpointRegistry(this.endpointRegistry); } // 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory if (this.containerFactoryBeanName != null) { this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName); } // Set the custom handler method factory once resolved by the configurer MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory(); if (handlerMethodFactory != null) { this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory); } // Actually register all listeners,初始化RabbitListenerEndpointRegistrar this.registrar.afterPropertiesSet(); }
postProcessAfterInitialization()方法会在对象初始化完毕后被执行,此方法会拦截所有的被@RabbitListener和@RabbitHandler注解的方法。
1. @RabbitListener如果注解到方法上,则调用方法processAmqpListener(),此时会使用MethodRabbitListenerEndpoint 封装调用方法
2. @RabbitListener如果注解到类上,且类有方法被@RabbitHandler注解,则调用processMultiMethodListeners(),此时会使用MultiMethodRabbitListenerEndpoint 封装调用方法
MethodRabbitListenerEndpoint 和MultiMethodRabbitListenerEndpoint都是MethodRabbitListenerEndpoint 的子类
@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { … // 处理所有被@RabbitListener注解的方法 for (ListenerMethod lm : metadata.listenerMethods) { for (RabbitListener rabbitListener : lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); } } // 处理所有被@RabbitHandler注解的方法 if (metadata.handlerMethods.length > 0) { processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; } private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods, Object bean, String beanName) { … for (RabbitListener classLevelListener : classLevelListeners) { // 创建处理有多个监听方法的类 MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName); } } protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); // 创建处理单个监听方法的类 MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, rabbitListener, bean, methodToUse, beanName); }
无论是方法processMultiMethodListeners()和processMultiMethodListeners()都会进入processListener(),这里做如下内容:
1. 第一步这里先根据监听方法上的@RabbitListener的配置参数,设置MethodRabbitListenerEndpoint 要监听的队列、优先级、排他性等待,
2. 第二步获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中
3. 第三步 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory,默认值为空
4. 第四步将调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。后面会解释这个RabbitListenerEndpointRegistrar类
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) { // 这里有设置MethodRabbitListenerEndpoint endpoint的要监听的队列、优先级、排他性等待 … // 获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中 String rabbitAdmin = resolve(rabbitListener.admin()); if (StringUtils.hasText(rabbitAdmin)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name"); try { endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class)); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" + rabbitAdmin + "' was found in the application context", ex); } } // 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory RabbitListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(rabbitListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } // 调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。RabbitListenerEndpointRegistra下面会解释这个类 this.registrar.registerEndpoint(endpoint, factory); }
将上文的RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry上的工作类
初始化操作,此方法被RabbitListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated()方法调用,触发初始化 。主要内容如下:
private RabbitListenerEndpointRegistry endpointRegistry;... // 初始化操作 @Override public void afterPropertiesSet() { registerAllEndpoints(); } protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) {、 # AmqpListenerEndpointDescriptor是保存RabbitListenerEndpoint和RabbitListenerContainerFactory实例 for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) { # 将MethodRabbitListenerEndpoint endpoint + RabbitListenerContainerFactory注册到endpointRegistry上 this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } # 设置值为true this.startImmediately = true; // trigger immediate startup } } # 获取RabbitListenerContainerFactory实例 /** 如果endpoint 结点注册是有RabbitListenerContainerFactory,则使用这个值(实际来自@RabbitListener的containerFactory()值)。 如果没有则使用默认的RabbitListenerContainerFactory,如果没有,则从spring容器中获取名称为containerFactoryBeanName值的RabbitListenerContainerFactory对象并设置为默认值 在之前我们已经知道这个值被RabbitListenerAnnotationBeanPostProcessor在afterSingletonsInstantiated()中设置为rabbitListenerContainerFactory **/ private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) { if (descriptor.containerFactory != null) { return descriptor.containerFactory; } else if (this.containerFactory != null) { return this.containerFactory; } else if (this.containerFactoryBeanName != null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); this.containerFactory = this.beanFactory.getBean( this.containerFactoryBeanName, RabbitListenerContainerFactory.class); return this.containerFactory; // Consider changing this if live change of the factory is required } else { throw new IllegalStateException("Could not resolve the " + RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" + descriptor.endpoint + "] no factory was given and no default is set."); } }
此方法被RabbitListenerAnnotationBeanPostProcessor的processListener()方法调用。主要内容如下:
在之前的分析中我们初始化设置startImmediately=true,现在只分析true情况。此时调用 RabbitListenerEndpointRegistry的registerListenerContainer()方法将且传递参数startImmediately=true
// 在 RabbitListenerAnnotationBeanPostProcessor调用此方法进行注册 private RabbitListenerEndpointRegistry endpointRegistry; public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { … // Factory may be null, we defer the resolution right before actually creating the container AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately # 在之前的分析中我们发现startImmediately=true,现在只分析true情况 # 调用endpointRegistry的方法注册到上面this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } }
功能: 为已经注册的RabbitListenerEndpoint创建MessageListenerContainer实例,并执行MessageListenerContainer的初始化操作,最后执行此对象的start()方法。此类还管理监听容器的生命周期
// 设置RabbitListenerEndpoint 创建一个监听容器 public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); # 为endpoint根据factory创建一个监听器,方法详细见下边 MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); # Group?? if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { # 启动容器MessageListenerContainer,已知传入的值为true startIfNecessary(container); } } }
# 为endpoint根据factory创建一个监听器 protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { # 使用endpoint配置的RabbitListenerContainerFactory创建MessageListenerContainer 。默认是使用SimpleRabbitListenerContainerFactory创建的实例为SimpleMessageListenerContainer MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); # 初始化容器 if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } … return listenerContainer; }
调用MessageListenerContainer 的start()方法启动容器
// 启动容器 private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { // 容器默认值为自动启动,所有会执行一下操作 listenerContainer.start(); } }
此类是RabbitListenerContainerFactory的子类
创建SimpleMessageListenerContainer实例
// 创建SimpleMessageListenerContainer实例 @Override protected SimpleMessageListenerContainer createContainerInstance() { return new SimpleMessageListenerContainer(); }
此方法:调用createContainerInstance()创建SimpleMessageListenerContainer实例,并使用RabbitListenerEndpoint包含的参数设置到创建的实例中,最后调用initializeContainer()初始化SimpleMessageListenerContainer实例
@Override public C createListenerContainer(RabbitListenerEndpoint endpoint) { // 创建实例 C instance = createContainerInstance(); // 以下设置容器的初始值 if (this.connectionFactory != null) { instance.setConnectionFactory(this.connectionFactory); } // 其他根据本对象的成员变量配置RabbitListenerEndpoint 代码略 … instance.setListenerId(endpoint.getId()); endpoint.setupListenerContainer(instance); // 初始化容器 initializeContainer(instance); return instance; }
初始化刚刚创建的SimpleMessageListenerContainer实例,将本对象中的成员变量配置到SimpleMessageListenerContainer实例中
// 根据容器工厂初始化容器值 @Override protected void initializeContainer(SimpleMessageListenerContainer instance) { super.initializeContainer(instance); if (this.applicationContext != null) { instance.setApplicationContext(this.applicationContext); } if (this.taskExecutor != null) { instance.setTaskExecutor(this.taskExecutor); } if (this.transactionManager != null) { instance.setTransactionManager(this.transactionManager); } if (this.txSize != null) { instance.setTxSize(this.txSize); } // 其他根据本对象的成员变量配置SimpleMessageListenerContainer 代码略 … }
在RabbitListenerEndpointRegistry中会调用SimpleMessageListenerContainer的start()方法
如果没有初始化,则执行此类的初始化操作
// 初始操作,主要操作见doStart() @Override public void start() { if (!this.initialized) { synchronized (this.lifecycleMonitor) { if (!this.initialized) { afterPropertiesSet(); this.initialized = true; } } } try { if (logger.isDebugEnabled()) { logger.debug("Starting Rabbit listener container."); } # 调用子类方法 doStart(); } catch (Exception ex) { throw convertRabbitAccessException(ex); } }
主要操作如下:
第一步:获取RabbitAdmin实例
第二步:rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明
第三步:调用父类doStart()方法
第四步:在线程池中启动这个消息者,进行消息消费
第五步:监控消息者是否启动成功,如果失败则抛出异常
protected void doStart() throws Exception { if (getMessageListener() instanceof ListenerContainerAware) { // 验证当前监听的队列是否和容器相同 …. Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames(); if (expectedQueueNames != null) { String[] queueNames = getQueueNames(); Assert.state(expectedQueueNames.size() == queueNames.length, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames)); boolean found = false; for (String queueName : queueNames) { if (expectedQueueNames.contains(queueName)) { found = true; } else { found = false; break; } } Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames)); } } // 获取rabbitAdmin值 if (this.rabbitAdmin == null && this.getApplicationContext() != null) { Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class); if (admins.size() == 1) { this.rabbitAdmin = admins.values().iterator().next(); } else { …. } } // rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明 checkMismatchedQueues(); // 调用父类方法 super.doStart(); synchronized (this.consumersMonitor) { // 初始化消息者,此方法会创建消息者BlockingQueueConsumer int newConsumers = initializeConsumers(); …. Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); // 在线程池中启动这个消息者,进行消息消费:AsyncMessageProcessingConsumer 是个线程类,它调用BlockingQueueConsumer的start()方法接收消息并进行处理。 for (BlockingQueueConsumer consumer : this.consumers) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); // 在线程池中执行 AsyncMessageProcessingConsumer this.taskExecutor.execute(processor); if (this.applicationEventPublisher != null) { this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } // 监控消息者是否启动成功,如果失败则抛出异常 for (AsyncMessageProcessingConsumer processor : processors) { FatalListenerStartupException startupException = processor.getStartupException(); if (startupException != null) { throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException); } } } }
rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明
public void initialize() { … // 获取所有的队列、交换机、绑定 Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); … // 在RabbitMQ上进行声明,创建对应的队列、交换机、绑定 this.rabbitTemplate.execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; } }); }
继承Runnable,的run()方法中调用BlockingQueueConsumer 的start()方法
private final class AsyncMessageProcessingConsumer implements Runnable { private final BlockingQueueConsumer consumer; private volatile FatalListenerStartupException startupException; private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; this.start = new CountDownLatch(1); } @Override public void run() { …. try { if (SimpleMessageListenerContainer.this.autoDeclare) { SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); } // 启动BlockingQueueConsumer 的start()方法 this.consumer.start(); .. } … ...
BlockingQueueConsumer:在start中调用basicConsume方法定阅消息。在这里我们可以看到spring开始调用RabbitMQ提供的Jar包里的类(.channel.basicConsume),说明我们已经跟到底层,所有源码分析到此为止
void start() throws AmqpException { try { for (String queueName : this.queues) { // 循环订阅所有的消息 if (!this.missingQueues.contains(queupublic eName)) { consumeFromQueue(queueName); } } } catch (IOException e) { throw RabbitExceptionTranslator.convertRabbitAccessException(e); } } private void consumeFromQueue(String queue) throws IOException { // 调用basicConsume方法定阅消息。在这里我们可以看到spring开始调用RabbitMQ提供的Jar包里的类(.channel.basicConsume), String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), false, this.exclusive, this.consumerArgs, this.consumer); if (consumerTag != null) { this.consumerTags.put(consumerTag, queue); } ... } }