在引入一项技术之前,首先必须清楚的是该技术可以为项目解决什么问题。个人在了解消息队列(Message Queue)之前,以为消息队列主是用于发送短信、邮件等消息发送(异步解耦),但深入理解才发现自己的理解错了,MQ的作用不止体现在一些用户接收到的具体消息里,还可用于其它应用的数据发送、通用的业务处理等。 消息队列从字面上意思解读就是将消息存放到队列里,根据队列FIFO(先入先出)的特性进行消息消费。在实际开发中,是一种跨进程的通信机制,用于应用间的消息传递。
MQ的主要优点为 解耦 、 异步 、 削峰 ,以下举一个简单的场景来反应这几个特性。
在微服务项目中,一般会根据核心业务进行系统的垂直拆分再进行单独部署。在上图中,各系统在下单业务里主要负责的内容如下:
想象下以上场景没有MQ的的存在时创建订单流程中存在的问题:
任何事物都有两面性,虽然MQ可以给系统解决不少问题,但也会引入一些问题,如:
了解了MQ的一些特性后,再讨论下几个适合使用MQ的场景:
幂等性:对于同一操作的请求无论请求多少次结果都是一致的,在MQ中的具体体现为同一条消息无论发送都少次都会被消费一次。
由于网络抖动(延迟)的原因消息重复发送的问题是不可避免的,如果在消费端消费时没有做好消息的幂等性保证就有可能出现重复消费,导致同一条消息被多次消费、写库多次的情况。比较常见的做法是为消息添加一个唯一标识(ID),在消费时根据ID查询数据库是否存在该消息记录,如果不存在再插入消息,存在则不进行插入消费。当生成与消费时间间隔不长时,可使用Redis提高消息幂等性的效率,如:
关于消息ID:
如目前个人工作中负责的消息中心应用是基于MongoDB+RocketMQ的技术架构,MongoDB负责存储各个应用发送过来的消息(主要为Sms、Email等),每次消费前通过RocketMQ的Message ID查询Mongo保证消息幂等性避免重复消费,消费成功后更新DB中的消息状态。
以下便以一个基于MongoDB+RocketMQ+Eureka+Spring Cloud Config的技术框架并结合使用MQ中的问题搭建一个简单的消息中心项目案例,其中各组件在项目中的主要作用如下:
下图为该项目的应用关系模型:
消息中心应用:统一通用消息的业务处理应用,如短信发送、邮件发送、员工服务号推送等消息的处理 问卷应用:负责员工调查问卷的分发,在该例子中只是一个简单的消息发送测试应用 common:存放各应用通用类,如短信消息类(SmsMessage)、消息常量类 config-server-properties:配置中心的配置存放目录 由于该项目主要用于演示一些MQ的功能与使用中的问题解决方式,所以编码部分比较简单。
通用模块主要存放各应用通用类(如实体、常量、配置、功能等)。 MessageConstant:维护消息常量
public interface MessageConstant { interface System { String QUESTION = "QUESTION"; } interface Topic { String SMS_TOPIC = "rocketmq.topic.sms"; String SMS_TOPIC_TEMPLATE = "${rocketmq.topic.sms}"; String MAIL_TOPIC = "rocketmq.topic.mail"; String MAIL_TOPIC_TEMPLATE = "${rocketmq.topic.mail}"; } interface Producer { String SMS_GROUP_TEMPLATE = "${rocketmq.producer.group.sms}"; String MAIL_GROUP_TEMPLATE = "${rocketmq.producer.group.mail}"; } interface Consumer { String SMS_GROUP_TEMPLATE = "${rocketmq.consumer.group.sms}"; String MAIL_GROUP_TEMPLATE = "${rocketmq.consumer.group.mail}"; } } 复制代码
@Data @Accessors(chain = true) public abstract class BaseMessage implements Serializable { /** * 消息源系统:{@link io.wilson.common.message.constant.MessageConstant.System} */ private String system; } 复制代码
@EqualsAndHashCode(callSuper = true) @Data @Accessors(chain = true) @ToString(callSuper = true) public class SmsMessage extends BaseMessage { /** * 短信创建用户 */ private String createUserId; /** * 接收短信用户 */ private String toUserId; /** * 手机号码 */ private String mobile; /** * 短信内容 */ private String content; } 复制代码
消息中心在进行编码之前,需确认消息中心该如何进行消息的处理。该项目所处的业务环境是各应用可能都需要发送一些短信消息、邮件、服务号消息等,相同消息的业务处理是一致的,所以消息中心对消息接收消费的主要流程如下:
在该项目中,不同的消息类型存储在不同的Mongodb collection(同Mysql table概念),但共用一个消息日志类MessageLog:
@Data @Accessors(chain = true) public class MessageLog implements Serializable { private String msgId; /** * 发送方系统名称 {@link io.wilson.common.message.constant.MessageConstant} */ private String system; /** * 消息对象json字符串 */ private String msgContent; /** * 业务执行结果 */ private Boolean success; private LocalDateTime createTime; private LocalDateTime updateTime; /** * 初始化消息记录 * * @param message 消息 * @return */ public static <T extends BaseMessage> MessageLog convertFromMessage(T message) { LocalDateTime now = LocalDateTime.now(); return new MessageLog() .setSystem(message.getSystem()) .setSuccess(false) .setCreateTime(now) .setUpdateTime(now); } } 复制代码
在该消费流程设计与开发编码过程中个人考虑的核心点如下:
为了更好地展示消息中心中类之间的关系,描绘以下类图:
当一条短信消息发送到消息中心时,其消费流程如下图:
public interface BaseMessageService<T extends BaseMessage> { /** * 消费消息 * * @param message 消息 * @param consumeFunction 消费方法 */ default boolean consume(T message, Function<T, Boolean> consumeFunction) { return consumeFunction.apply(message); } } 复制代码
@Service public interface SmsMessageService extends BaseMessageService<SmsMessage> { /** * 发送单条短信消息 * * @param smsMessage * @return 业务处理结果 */ boolean sendSingle(SmsMessage smsMessage); } 复制代码
@Service @Slf4j public class SmsMessageServiceImpl implements SmsMessageService { @Override public boolean sendSingle(SmsMessage smsMessage) { // 短信业务操作结果 boolean isSuccess = true; /* * 短信业务操作并把操作结果设到isSuccess中 */ if (Objects.equals(smsMessage.getToUserId(), "Wilson")) { isSuccess = false; log.info("短信发送失败,消息内容:{}", smsMessage); } return isSuccess; } } 复制代码
public interface MessageLogConstant { /** * 各消息日志Mongo集合名 */ interface CollectionName { String SMS = "sms_message_log"; String MAIL = "mail_message_log"; } } 复制代码
@Slf4j public abstract class AbstractMQStoreListener { @Resource protected MongoTemplate mongoTemplate; /** * 判断消息是否已被消费 * * @param msgId * @return */ protected boolean isConsumed(String msgId) { long count = mongoTemplate.count(new Query(Criteria.where("msg_id").is(msgId)), collection()); if (count > 0) { log.info("消息{}已成功消费过,请勿重复投递!", msgId); return true; } return false; } /** * 当前消息的mongo collection名:{@link io.wilson.message.domain.constant.MessageLogConstant.CollectionName} * * @return 当前消息存储的collection名 */ protected abstract String collection(); /** * 保存消息消费记录 * * @param success 业务执行结果 * @param msgId 消息id * @param message */ void store(boolean success, String msgId, BaseMessage message) { MessageLog messageLog = MessageLog.convertFromMessage(message) .setMsgId(msgId) .setMsgContent(JSONObject.toJSONString(message)) .setSuccess(success); mongoTemplate.insert(messageLog, collection()); } } 复制代码
@Slf4j @Service @ConditionalOnProperty(MessageConstant.Topic.SMS_TOPIC) @RocketMQMessageListener(topic = MessageConstant.Topic.SMS_TOPIC_TEMPLATE, consumerGroup = MessageConstant.Consumer.SMS_GROUP_TEMPLATE) public class SmsMessageListener extends AbstractMQStoreListener implements RocketMQListener<MessageExt> { @Resource private SmsMessageService smsMessageService; private static final String EXCEPTION_FORMAT = "短信消息消费失败,消息内容:%s"; @Override public void onMessage(MessageExt message) { String msgId = message.getMsgId(); if (isConsumed(msgId)) { return; } SmsMessage smsMessage = JSONObject.parseObject(message.getBody(), SmsMessage.class); log.info("接收到短信消息{}:{}", msgId, smsMessage); /*if (Objects.equals(smsMessage.getToUserId(), "2020")) { log.error("消息{}消费失败", message.getMsgId()); // 抛出异常让RocketMQ重新投递消息重新消费 throw new MQConsumeException(String.format(EXCEPTION_FORMAT, smsMessage)); }*/ boolean isSuccess = smsMessageService.consume(smsMessage, smsMessageService::sendSingle); if (!isSuccess) { log.info("短信消息业务操作失败,消息id: {}", msgId); } // 保存消息消费记录 store(isSuccess, msgId, smsMessage); } @Override protected String collection() { return MessageLogConstant.CollectionName.SMS; } } 复制代码
@SpringBootApplication @EnableDiscoveryClient public class MessageCenterApplication { public static void main(String[] args) { SpringApplication.run(MessageCenterApplication.class, args); } } 复制代码
eureka: client: service-url: defaultZone: http://localhost:8000/eureka spring: cloud: config: discovery: enabled: true service-id: config-center # 资源文件名 profile: dev name: rocketmq 复制代码
@SpringBootTest(classes = MessageCenterApplication.class) @RunWith(SpringJUnit4ClassRunner.class) public class SmsSendTest { @Resource private RocketMQTemplate rocketMQTemplate; @Value(MessageConstant.Topic.SMS_TOPIC_TEMPLATE) private String smsTopic; @Test public void sendSms() { SmsMessage smsMessage = new SmsMessage(); smsMessage.setToUserId("13211") .setMobile("173333222") .setContent("测试短信消息") .setSystem(MessageConstant.System.QUESTION); rocketMQTemplate.send(smsTopic, MessageBuilder.withPayload(smsMessage).build()); } } 复制代码
主程序ConfigServerApplication
@SpringBootApplication @EnableDiscoveryClient @EnableConfigServer public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); } } 复制代码
Spring Cloud配置文件bootstrap.yml:
spring: cloud: config: server: git: uri: https://gitee.com/Wilson-He/rocketmq-message-center-demo.git username: Wilson-He force-pull: true password: # 配置文件在uri下的目录 search-paths: /config-server-properties eureka: client: service-url: defaultZone: http://localhost:8000/eureka 复制代码
配置文件configs-server-properties/rocketmq-dev.properties:
rocketmq.name-server=127.0.0.1:9876 rocketmq.topic.sms=sms-topic rocketmq.producer.group.sms=sms-group rocketmq.consumer.group.sms=sms-group rocketmq.topic.mail=mail-topic rocketmq.producer.group.mail=mail-group rocketmq.consumer.group.mail=mail-group 复制代码
mqnamesrv -n 127.0.0.1:9876
, mqbroker -n 127.0.0.1:9876
localhost:8080/question/toUser?userId=xxx
进行消费测试,消息中心控制台打印出日志信息与Mongo sms_message_log成功新增了数据即项目搭建完成
该文章通过一个简单的项目例子演示了使用Spring Boot RocketMQ处理MQ常见问题的一些方式:
项目源码