下载ActiveMq的tar安装包, 解压到响应目录下, 使用bin目录下的./activemq start启动, ./activemq stop停止
配置类
@EnableJms @Configuration public class ActiveMQ4Config { @Bean public Queue queue(){ return new ActiveMQQueue("queue1"); } @Bean public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy(); //是否在每次尝试重新发送失败后,增长这个等待时间 redeliveryPolicy.setUseExponentialBackOff(true); //重发次数,默认为6次 这里设置为10次 redeliveryPolicy.setMaximumRedeliveries(10); //重发时间间隔,默认为1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 /* 2毫秒,这里的2就是 value redeliveryPolicy.setBackOffMultiplier(2); //是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){ ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory( "admin", "admin", url); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); return activeMQConnectionFactory; } @Bean(name="jmsQueueTemplate") public JmsTemplate jmsQueueTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) { //设置创建连接的工厂 //JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory); //优化连接工厂,这里应用缓存池 连接工厂就即可 JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); //设置默认消费topic //jmsTemplate.setDefaultDestination(topic()); //设置P2P队列消息类型 jmsTemplate.setPubSubDomain(isPubSubDomain); DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique(); if (destinationResolver != null) { jmsTemplate.setDestinationResolver(destinationResolver); } MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique(); if (messageConverter != null) { jmsTemplate.setMessageConverter(messageConverter); } //deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false jmsTemplate.setExplicitQosEnabled(true); //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 //定义持久化后节点挂掉以后,重启可以继续消费. jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); //默认不开启事务 System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted()); //如果不启用事务,则会导致XA事务失效; //作为生产者如果需要支持事务,则需要配置SessionTransacted为true //jmsTemplate.setSessionTransacted(true); //消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式 //Session.AUTO_ACKNOWLEDGE 消息自动签收 //Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收 //Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsTemplate; } @Bean(name="jmsTopicTemplate") public JmsTemplate jmsTopicTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) { //设置创建连接的工厂 //JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //优化连接工厂,这里应用缓存池 连接工厂就即可 JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); //设置默认消费topic //jmsTemplate.setDefaultDestination(topic()); //设置发布订阅消息类型 jmsTemplate.setPubSubDomain(isPubSubDomain); //deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false jmsTemplate.setExplicitQosEnabled(true); //DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); //默认不开启事务 System.out.println("是否开启事务"+jmsTemplate.isSessionTransacted()); //如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。 //jmsTemplate.setSessionTransacted(true); //不带事务的session的签收方式,取决于session的配置。 //默认消息确认方式为1,即AUTO_ACKNOWLEDGE System.out.println("是否消息确认方式"+jmsTemplate.getSessionAcknowledgeMode()); //消息的应答方式,需要手动确认,此时SessionTransacted必须被设置为false,且为Session.CLIENT_ACKNOWLEDGE模式 //Session.AUTO_ACKNOWLEDGE 消息自动签收 //Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收 //Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsTemplate; } //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory); //设置连接数 factory.setConcurrency("1-10"); //重连间隔时间 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(4); return factory; } }
消费者
@Component public class Consumer { private final static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener") public void receiveQueue(final TextMessage text, Session session)throws JMSException { try { logger.debug("Consumer收到的报文为:" + text.getText()); text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发 } catch (Exception e) { session.recover();// 此不可省略 重发信息使用 } } }
生产者(不同的设置, 生产者和消费者要进行签收或者提交操作)
@Component public class Producter { @Autowired("..")//这里根据消息发布类型不同注入 private JmsTemplate jmsTemplate; @Autowired private Queue queue; @Autowired private Topic topic; //发送queue类型消息 public void sendQueueMsg(String msg){ jmsTemplate.convertAndSend(queue, msg); } //发送topic类型消息 public void sendTopicMsg(String msg){ jmsTemplate.convertAndSend(topic, msg); } }
延时投递的实现(其余高级特性实现方式类似)
broker配置文件schedulerSupport修改为true <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
@Service public class Producer { public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue"); @Autowired private JmsMessagingTemplate template; /** * 延时发送 * * @param destination 发送的队列 * @param data 发送的消息 * @param time 延迟时间 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 获取连接工厂 ConnectionFactory connectionFactory = template.getConnectionFactory(); try { // 获取连接 connection = connectionFactory.createConnection(); connection.start(); // 获取session,true开启事务,false关闭事务 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 发送消息 producer.send(message); log.info("发送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
默认是异步发送消息, 这种消息效率更高, 但是会出现消息丢失, 但是有以下情况会发送同步消息 1.指定使用同步发送消息 2.在没有事务的前提下发送持久化消息
需要接收回调
// 创建一个消息队列 ActiveMqMessageProducer producer = (ActiveMqMessageProducer)session.createProducer(destination); ObjectMessage message = session.createObjectMessage(data); // 发送消息 producer.send(message, new AsyncCallback() { ... });
1. 什么情况下会导致消息的重试 . 客户端在使用事务的前提下, rollBack()或者没有commit()消息; . 未使用事务的前提下, 使用ACKNOWLEDGE模式, 进行了session.recover() 2. 重试多少次, 每次间隔 默认是6次, 间隔为1s 3. 超过重发的次数, 消息会被放入死信队列中
可以通过individualDeadLetterStrategy来设置各自的死信队列, 也可以设置过期
可以根据messageId来做校验, 可以使用redis来做