转载

springboot配置activemq

网上有好多介绍springboot集成activemq的文章,看了一些文章感觉比较零散,还是抽时间自己详细总结一个如何使用,需要注意哪些点。尤其是关于连接池的配置,需要重点关注,否则在消息量大的情况下会把服务器搞挂。

快速配置

如果你只是连接一个activemq集群或节点,那么配置非常简单(这也是springboot便捷的原因)。

如下:

spring.activemq.broker-url=tcp://127.0.0.1:61616?connectionTimeout=3000&soTimeout=500&tcpNoDelay=true&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=10
spring.activemq.user=admin
spring.activemq.password=admin

就这么简单!有了上面的配置你就可以发送消息了(通过JmsTemplate)。这背后的原理是通过springboot提供的 ActiveMQAutoConfiguration 来实现的。

@Configuration
@AutoConfigureBefore(JmsAutoConfiguration.class)
@AutoConfigureAfter({ JndiConnectionFactoryAutoConfiguration.class })
@ConditionalOnClass({ ConnectionFactory.class, ActiveMQConnectionFactory.class })
@ConditionalOnMissingBean(ConnectionFactory.class)
@EnableConfigurationProperties(ActiveMQProperties.class)
@Import({ ActiveMQXAConnectionFactoryConfiguration.class,
truetrueActiveMQConnectionFactoryConfiguration.class })
public class ActiveMQAutoConfiguration {

}

ActiveMQAutoConfiguration 的代码能得知,只要你的classpath里面存在 ConnectionFactory.class和ActiveMQConnectionFactory.class 并且容器里面没有类型为 ConnectionFactory.class 的Bean,那么该自动配置组件就会生效。

通过 ActiveMQAutoConfiguration ,我们在spring容器中就能自动获取一个类型为 ConnectionFactory.class 的Bean 和 JmsTemplate.class 的Bean。

发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(profiles = {"dev"})
public class TestSendAmqMsg {

    @Resource
    private JmsTemplate jmsTemplate;
    
    @Test
    public void testSendMsgDefault() throws Exception {
        jmsTemplate.convertAndSend("java");
        //等待消费,因为是自发自消费
        Thread.sleep(1000 * 20);
    }
}

上面的代码其实是不能正常工作的。原因是 jmsTemplate.convertAndSend 没有指定 Destination

Destination 的指定有两种方式,一种是通过方法参数指定。如下所示:

jmsTemplate.convertAndSend("hello-jms-queue", "java");

一种是通过在application.properties文件中指定一个默认值:

spring.jms.template.default-destination=hello-jms-default

还有一点需要注意的是 Destination 类型,是Topic还是Queue。默认是Queue。 Destination 类型也可以通过两种方式设置。

一种是通过在application.properties文件中指定一个默认值:

# false 表示是Queue
spring.jms.pub-sub-domain=false

一种是通过API

jmsTemplate.setPubSubDomain(false);

注意:上面的例子虽然能实现消息的发送和接收,但是非常有局限性。一个ActiveMQ上既有Topic也有Queue,我们通过 JmsTemplate 发送和消费消息时,最好是通过参数 Destination 来指定目的地,热不是一个字符串(不知道是具体是什么类型,只能通过全局配置)。

消费消息

有了上面的配置,我们可以有两种消费消息的方式。

JmsTemplate
@JmsListener

通过 @JmsListener 来实现消息消费,配置如下。

@Configuration
@EnableJms
public class JmsConfig {

    @JmsListener(containerFactory = "jmsListenerContainerFactory", destination = "hello-jms")
    public void consumerMsg(String msg) {
        System.out.println("############# Received message is : [" + msg + "]*************");
    }
}

@EnableJms 的作用是启用spring的Jms的注解驱动能力。注册了 JmsListenerAnnotationBeanPostProcessor Bean。原理如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class JmsBootstrapConfiguration {

@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
truetruereturn new JmsListenerAnnotationBeanPostProcessor();
true}

true@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
truepublic JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
truetruereturn new JmsListenerEndpointRegistry();
true}

}

JmsAnnotationDrivenConfiguration 该配置类非常关键:

@Configuration
@ConditionalOnClass(EnableJms.class)
class JmsAnnotationDrivenConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
        	DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
        	configurer.setDestinationResolver(this.destinationResolver.getIfUnique());
        	configurer.setTransactionManager(this.transactionManager.getIfUnique());
        	configurer.setMessageConverter(this.messageConverter.getIfUnique());
        	configurer.setJmsProperties(this.properties);
        	return configurer;
    }

true@Bean
true@ConditionalOnMissingBean(name = "jmsListenerContainerFactory")
truepublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
			DefaultJmsListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
truetrueDefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
truetrueconfigurer.configure(factory, connectionFactory);
truetruereturn factory;
true}
}

配置JmsTemplate属性

spring.jms.template.default-destination=hello-jms-default
spring.jms.template.delivery-mode=non_persistent
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.time-to-live=50
# 设置消息延迟投递时间 需要jms 2.0 支持
#spring.jms.template.delivery-delay=1
spring.jms.template.receive-timeout=100

配置消费属性

# 消息消费
spring.jms.listener.acknowledge-mode=client
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=10
spring.jms.listener.max-concurrency=20

连接池配置

通过上面的学习,我们已经能实现消息的发送和消费了。但是有一个问题就是,我们会和ActiveMQ Broker建立大量的短连接。在高并发下肯定是不可以的。通过在 application.properties 中简单配置,我们就能获得连接池能力。

添加依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.14.3</version>
</dependency>

修改配置

# 只有配置了该选项,才会启用activemq连接池功能,参见:ActiveMQConnectionFactoryConfiguration
spring.activemq.pool.enabled=true
# 配置连接池参数
spring.activemq.pool.configuration.max-connections=10
spring.activemq.pool.configuration.idle-timeout=30000
spring.activemq.pool.configuration.expiry-timeout=0

spring.activemq.pool.configuration.create-connection-on-startup=false
spring.activemq.pool.configuration.time-between-expiration-check-millis=60000
spring.activemq.pool.configuration.maximum-active-session-per-connection=100
spring.activemq.pool.configuration.reconnect-on-exception=true
spring.activemq.pool.configuration.block-if-session-pool-is-full=true
spring.activemq.pool.configuration.block-if-session-pool-is-full-timeout=3000

连接池自动配置实现原理

@ConditionalOnClass(PooledConnectionFactory.class)
static class PooledConnectionFactoryConfiguration {

true@Bean(destroyMethod = "stop")
true@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true", matchIfMissing = false)
true@ConfigurationProperties(prefix = "spring.activemq.pool.configuration")
truepublic PooledConnectionFactory pooledJmsConnectionFactory(
			ActiveMQProperties properties) {
truetruePooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(
truetruetruetruenew ActiveMQConnectionFactoryFactory(properties)
truetruetruetruetruetrue.createConnectionFactory(ActiveMQConnectionFactory.class));

truetrueActiveMQProperties.Pool pool = properties.getPool();
truetruepooledConnectionFactory.setMaxConnections(pool.getMaxConnections());
truetruepooledConnectionFactory.setIdleTimeout(pool.getIdleTimeout());
truetruepooledConnectionFactory.setExpiryTimeout(pool.getExpiryTimeout());
truetruereturn pooledConnectionFactory;
true}
}

有了这个实现作为参考,如果我们不想使用springboot提供的 ActiveMQ 自动配置功能,我们自己写代码配置,也能实现连接池的功能,无非就是普通的 ActiveMQConnectionFactoryFactory 进行包装而已。

有一个细节需要注意:前置为 spring.activemq.pool.configuration 的配置属性是如何设置到 PooledConnectionFactory 的呢?但是是通过 ConfigurationPropertiesBindingPostProcessor 该类会处理注解 ConfigurationProperties 指定的属性,通过反射设置到生成的Bean中(在Bean初始化前)。

原文  https://leokongwq.github.io/2019/08/28/springboot-jms.html
正文到此结束
Loading...