关于spring整合rabbitmq看了网上很多资料感觉描述的不够详细,正好最近自己使用到了这项技术,总结一下的详细过程,分享给大家。
1、首先有一个springMVC的demo,这里就不再介绍,自己提前准备。注意的事情为spring版本不能过低,否则会报错,我就陷入这个坑中了。spring采用4.2.3.RELEASE版本.
2、安装rabbitmq服务,以前博客中详细教程: http://blog.csdn.net/l18637220680/article/details/75258280 。
3、准备jar包
<!--rabbitmq依赖 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.1.RELEASE</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.0</version> </dependency>
创建rabbitmq-content.xml和rabbitmq.properties配置文件。rabbitmq.properties文件如下:
rabbit.hosts=192.168.1.239 rabbit.username=admin rabbit.password=admin123 rabbit.port=5672
在spring的配置文件中引入rabbitmq-content.xml和rabbitmq.properties配置文件。
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:/rabbitmq.properties</value> </list> </property> </bean> <import resource="classpath:/rabbitmq-context.xml" />
rabbitmq-content.xml如下。注意xml的命名空间的配置,不要有缺漏。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"> <!-- 定义连接工厂,用于创建连接等 --> <rabbit:connection-factory id="connectionFactory" username="${rabbit.username}" password="${rabbit.password}" host="${rabbit.hosts}" port="${rabbit.port}" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明 durable:true、false true:在服务器重启时,能够存活 exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列 autodelete:当没有任何消费者使用时,自动删除该队列 --> <!-- 对外接口保存日志队列 --> <rabbit:queue id="save_out_log" durable="true" auto-delete="false" exclusive="false" name="save_out_log" /> </beans>
列出java代码,ProducerDao生产者类。
/** * MQ生产者 */ public interface ProducerDao { /** * 发送消息 * @param key * @param obj void */ public void sendData(String key,Object obj); }
ProducerDaoImpl生产者实现类。
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; /** * MQ生产者实现 */ @Repository("producerDao") public class ProducerDaoImpl implements ProducerDao{ @Autowired private AmqpTemplate amqpTemplate; public void sendData(String key,Object obj) { amqpTemplate.convertAndSend(key, obj); } }
在rabbitmq-content.xml中加入以下代码。
<!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="mq-exchange" durable="true" auto-delete="false" id="mq-exchange" > <rabbit:bindings> <rabbit:binding queue="save_out_log" key="save_out_log_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 --> <bean id="jsonMessageConverter" class="com.baicaiqiche.core.queue.dao.FastJsonMessageConverter"></bean> <!-- spring template声明 --> <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 监听器 --> <bean id="saveOutLogListenter" class="com.baicaiqiche.core.queue.dao.SaveOutLogListenter"></bean> <!-- 监听对外接口保存日志队列 acknowledge="manual" 设置确认消息为手动模式--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" transaction-size="5" > <rabbit:listener queues="save_out_log" ref="saveOutLogListenter" /> </rabbit:listener-container>
FastJsonMessageConverter转换类。
import java.io.UnsupportedEncodingException; import net.sf.ezmorph.object.DateMorpher; import net.sf.json.JSONObject; import net.sf.json.util.JSONUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; /** * rabbitmq中json转换 */ public class FastJsonMessageConverter extends AbstractMessageConverter { public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { return null; } @SuppressWarnings("unchecked") public <T> T fromMessage(Message message,T t) { String json = ""; try { json = new String(message.getBody(),defaultCharset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } JSONUtils.getMorpherRegistry().registerMorpher(new DateMorpher(new String[] {"yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss","yyyy/MM/dd","yyyy/MM/dd HH:mm:ss"})); return (T) JSONObject.toBean((JSONObject.fromObject(json)), t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = com.alibaba.fastjson.JSONObject.toJSONString(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
队列监听器ChannelAwareMessageListener类。
import javax.annotation.Resource; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import com.alibaba.fastjson.JSONObject; import com.baicaiqiche.core.credit.domain.OutputServiceLog; import com.baicaiqiche.core.credit.service.OutputServiceLogService; import com.rabbitmq.client.Channel; /** * 队列监听器 */ public class SaveOutLogListenter implements ChannelAwareMessageListener { @Resource private OutputServiceLogService outputServiceLogService; @Override public void onMessage(Message msg, Channel channel) { try{ System.out.println("-------------MQ--------------"); System.out.println(msg.toString()); //接收消息 JSONObject str = JSONObject.parseObject(new String(msg.getBody())); //处理事务逻辑 OutputServiceLog outputServiceLog = JSONObject.toJavaObject(str, OutputServiceLog.class); outputServiceLogService.save(outputServiceLog); //确认消息 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true); }catch(Exception e){ e.printStackTrace(); } } }
在一个方法中注入producerDao,并调用sendData方法。
@Resource private ProducerDao producerDao; @Test public String test(String content) { Map<String,String> map = new HashMap<String, String>(); map.put("phone", bm.get("phone").toString()); map.put("idNo", bm.get("idNo").toString()); producerDao.sendData("zx_black_risk_key", map); logger.info(">>>>>>>>>返回的数据为:" + ret); return ret; } }
日志显示
-------------MQ-------------- (Body:'{"idNo":"34112519930802095x","phone":" "}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=my-mq-exchange, receivedRoutingKey=zx_black_risk_key, receivedDelay=null, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-ivOWDpOkyIhOtinz9ICGPA, consumerQueue=zx_black_risk])
因为是剪切项目中的代码,有什么问题可以提出来。