转载

activemq笔记

activemq笔记

activeMq的安装.启动和停止

下载ActiveMq的tar安装包, 解压到响应目录下, 使用bin目录下的./activemq start启动, ./activemq stop停止

activemq和spring-boot整合

配置类

@EnableJms
@Configuration
public class ActiveMQ4Config {

    @Bean
    public Queue queue(){
        return new ActiveMQQueue("queue1");
    }

    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次 这里设置为10次
        redeliveryPolicy.setMaximumRedeliveries(10);
        //重发时间间隔,默认为1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1);
        //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 /* 2毫秒,这里的2就是 value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(
            "admin",
            "admin",
            url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    @Bean(name="jmsQueueTemplate")
    public JmsTemplate jmsQueueTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
        //设置创建连接的工厂
        //JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory);
        //优化连接工厂,这里应用缓存池 连接工厂就即可
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        //设置默认消费topic
       //jmsTemplate.setDefaultDestination(topic());
        //设置P2P队列消息类型
        jmsTemplate.setPubSubDomain(isPubSubDomain);
        DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
        if (destinationResolver != null) {
            jmsTemplate.setDestinationResolver(destinationResolver);
        }
        MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
        if (messageConverter != null) {
            jmsTemplate.setMessageConverter(messageConverter);
        }
        //deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
        jmsTemplate.setExplicitQosEnabled(true);
        //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
        //定义持久化后节点挂掉以后,重启可以继续消费.
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        //默认不开启事务
        System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
        //如果不启用事务,则会导致XA事务失效;
        //作为生产者如果需要支持事务,则需要配置SessionTransacted为true
        //jmsTemplate.setSessionTransacted(true);
        //消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
        //Session.AUTO_ACKNOWLEDGE  消息自动签收
        //Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
        //Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsTemplate;
    }

    @Bean(name="jmsTopicTemplate")
    public JmsTemplate jmsTopicTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
        //设置创建连接的工厂
        //JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        //优化连接工厂,这里应用缓存池 连接工厂就即可
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        //设置默认消费topic
        //jmsTemplate.setDefaultDestination(topic());
        //设置发布订阅消息类型
        jmsTemplate.setPubSubDomain(isPubSubDomain);
        //deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
        jmsTemplate.setExplicitQosEnabled(true);
        //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        //默认不开启事务
        System.out.println("是否开启事务"+jmsTemplate.isSessionTransacted());
        //如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
        //jmsTemplate.setSessionTransacted(true);
        //不带事务的session的签收方式,取决于session的配置。
        //默认消息确认方式为1,即AUTO_ACKNOWLEDGE
        System.out.println("是否消息确认方式"+jmsTemplate.getSessionAcknowledgeMode());
        //消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式
        //Session.AUTO_ACKNOWLEDGE  消息自动签收
        //Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
        //Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsTemplate;
    }

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置连接数
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
}

消费者

@Component
public class Consumer {
 
    private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
    @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
    public void receiveQueue(final TextMessage text, Session session)throws JMSException {
        try {
            logger.debug("Consumer收到的报文为:" + text.getText());
            text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
        } catch (Exception e) {    
            session.recover();// 此不可省略 重发信息使用
        }
    }
}

生产者(不同的设置, 生产者和消费者要进行签收或者提交操作)

@Component
public class Producter {

    @Autowired("..")//这里根据消息发布类型不同注入
    private JmsTemplate jmsTemplate;
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;

    //发送queue类型消息
    public void sendQueueMsg(String msg){
        jmsTemplate.convertAndSend(queue, msg);
    }

    //发送topic类型消息
    public void sendTopicMsg(String msg){
        jmsTemplate.convertAndSend(topic, msg);
    }
}

延时投递的实现(其余高级特性实现方式类似)

broker配置文件schedulerSupport修改为true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
@Service
public class Producer {

    public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

    @Autowired
    private JmsMessagingTemplate template;

    /**
     * 延时发送
     *
     * @param destination 发送的队列
     * @param data        发送的消息
     * @param time        延迟时间
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 获取连接工厂
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            // 获取连接
            connection = connectionFactory.createConnection();
            connection.start();
            // 获取session,true开启事务,false关闭事务
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //设置延迟时间
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 发送消息
            producer.send(message);
            log.info("发送消息:{}", data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

ActiveMQ消息异步投递

默认是异步发送消息, 这种消息效率更高, 但是会出现消息丢失, 但是有以下情况会发送同步消息
    1.指定使用同步发送消息
    2.在没有事务的前提下发送持久化消息

如何确保消息发送成功

需要接收回调

// 创建一个消息队列
ActiveMqMessageProducer producer = (ActiveMqMessageProducer)session.createProducer(destination);
ObjectMessage message = session.createObjectMessage(data);
// 发送消息
producer.send(message, new AsyncCallback() {
    ...
});

消息的重试机制

1. 什么情况下会导致消息的重试
    . 客户端在使用事务的前提下, rollBack()或者没有commit()消息;
    . 未使用事务的前提下, 使用ACKNOWLEDGE模式, 进行了session.recover()
2. 重试多少次, 每次间隔
    默认是6次, 间隔为1s
3. 超过重发的次数, 消息会被放入死信队列中

死信队列的处理

可以通过individualDeadLetterStrategy来设置各自的死信队列, 也可以设置过期

保证消息的幂等性

可以根据messageId来做校验, 可以使用redis来做
原文  https://segmentfault.com/a/1190000022656708
正文到此结束
Loading...