本文继续上一篇定时任务中提到的邮件服务,简单讲解Spring Boot中如何使用MongoDB进行应用开发。
上文中提到的这个简易邮件系统大致设计思路如下:
1、发送邮件支持同步和异步发送两种
2、邮件使用MongDB进行持久化保存
3、异步发送,直接将邮件批量保存在MongoDB中,然后通过后台定时任务发送
4、同步发送,先调用Spring的发送邮件功能,接着将邮件批量保存至MongDB
5、不论同步还是异步,邮件发送失败,定时任务可配置为进行N次重试
MongoDB现在已经是应用比较广泛的文档型NoSQL产品,有不少公司都拿MongoDB来开发日志系统。随着MongoDB的不断迭代更新,据说最新版已经支持ACID和事务了。不过鉴于历史上MongoDB应用的一些问题,以及考虑到数据持久化和运维的要求,核心业务系统存储的技术选型要非常慎重。
MongoDB是由C++语言编写的一个基于分布式文件存储的开源数据库系统。MongoDB将数据存储为一个文档,数据结构由键值(key=>value)对组成。MongoDB 文档类似于 JSON 对象(也就是BSON,10gen开发的一个数据格式),字段值可以包含其他文档,数组及文档数组。主要优点可以概括如下:
(1)、SchemaLess,结构灵活,表结构更改非常自由,不用每次修改的时候都付出代价(想想RDBMS修改表结构要注意什么),适合业务快速迭代表结构非常不确定的场景,而且json和大多数的语言有天然的契合,还支持数组,嵌套文档等数据类型
(2)、自带高可用,自动主从切换(副本集)
(3)、自带水平分片(分片),内置了路由,配置管理,应用只要连接路由,对应用来说是透明的
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> mongodb
配置MongoDB连接串:
spring.data.mongodb.uri=mongodb://name:pass@ip:port/database?maxPoolSize=256
如果使用多台MongoDB数据库服务器,参考配置如下:
spring.data.mongodb.uri=mongodb://user:pwd@ip1:port1,ip2:port2/database?maxPoolSize=512
连接串的一般配置,可以参考:猛击 这里
环境搭建好了,下面就是着手编码了。
通常我们会有各种语言的MongoDB客户端,直接引入调用API。在Spring Boot中,直接使用MongoTemplate即可。
package com.power.demo.mongodb; import com.power.demo.domain.MailDO; import java.util.Date; import java.util.List; public interface MailDao { /** * 批量创建对象 * * @param entList */ void batchInsert(List<MailDO> entList); /** * 创建对象 * * @param ent */ void insert(MailDO ent); /** * 根据ID查询对象 * * @param mailId * @return */ MailDO findByMailId(Long mailId); /** * 查询一段时间范围内待发送的邮件 * * @param startTime 开始时间 * @param endTime 结束时间 * @return */ List<MailDO> findToSendList(Date startTime, Date endTime); /** * 更新 * * @param ent */ void update(MailDO ent); /** * 删除 * * @param mailId */ void delete(Long mailId); } MailDao
package com.power.demo.mongodb; import com.power.demo.common.AppConst; import com.power.demo.common.SendStatusType; import com.power.demo.domain.MailDO; import com.power.demo.util.CollectionHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; @Component public class MailDaoImpl implements MailDao { @Autowired private MongoTemplate mongoTemplate; public void batchInsert(List<MailDO> entList) { //分组批量多次插入 每次2000条 List<List<MailDO>> groupList = CollectionHelper.spliceArrays(entList, AppConst.BATCH_RECORD_COUNT); for (List<MailDO> list : groupList) { mongoTemplate.insert(list, MailDO.class); } } public void insert(MailDO ent) { mongoTemplate.save(ent); } public MailDO findByMailId(Long mailId) { Query query = new Query(Criteria.where("mailId").is(mailId)); MailDO ent = mongoTemplate.findOne(query, MailDO.class); return ent; } /** * 查询一段时间范围内待发送的邮件 * * @param startTime 开始时间 * @param endTime 结束时间 * @return */ public List<MailDO> findToSendList(Date startTime, Date endTime) { Query query = new Query(Criteria.where("create_time").gte(startTime).lt(endTime) .and("has_delete").is(Boolean.FALSE) .and("send_status").ne(SendStatusType.SendSuccess.toString()) .and("retry_count").lt(AppConst.MAX_RETRY_COUNT)) //重试次数小于3的记录 .limit(AppConst.RECORD_COUNT); //每次取20条 List<MailDO> entList = mongoTemplate.find(query, MailDO.class); return entList; } public void update(MailDO ent) { Query query = new Query(Criteria.where("_id").is(ent.getMailId())); Update update = new Update() .set("send_status", ent.getSendStatus()) .set("retry_count", ent.getRetryCount()) .set("remark", ent.getRemark()) .set("modify_time", ent.getModifyTime()) .set("modify_user", ent.getModifyUser()); //更新查询返回结果集的第一条 mongoTemplate.updateFirst(query, update, MailDO.class); } public void delete(Long mailId) { Query query = new Query(Criteria.where("_id").is(mailId)); mongoTemplate.remove(query, MailDO.class); } } MailDaoImpl
实体MailDO这里只列举了我在实际开发应用中经常用到的字段,这个实体抽象如下:
package com.power.demo.domain; import lombok.Data; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Field; import java.io.Serializable; import java.util.Date; @Data @Document(collection = "mailinfo") public class MailDO implements Serializable { private static final long serialVersionUID = 1L; //唯一主键 @Id @Field("mail_id") private String mailId; @Field("mail_no") private Long mailNo; //邮件类型 如:Text表示纯文本、HTML等 @Field("mail_type") private String mailType; //邮件发送人 @Field("from_address") private String fromAddress; //邮件接收人 @Field("to_address") private String toAddress; //CC邮件接收人 @Field("cc_address") private String ccAddress; //BC邮件接收人 @Field("bcc_address") private String bccAddress; //邮件标题 @Field("subject") private String subject; //邮件内容 @Field("mail_body") private String mailBody; //发送优先级 如:Normal表示普通 @Field("send_priority") private String sendPriority; //处理状态 如:SendWait表示等待发送 @Field("send_status") private String sendStatus; //是否有附件 @Field("has_attachment") private boolean hasAttatchment; //附件保存的绝对地址,如fastdfs返回的url @Field("attatchment_urls") private String[] attatchmentUrls; //客户端应用编号或名称 如:CRM、订单、财务、运营等 @Field("client_appid") private String clientAppId; //是否删除 @Field("has_delete") private boolean hasDelete; //发送次数 @Field("retry_count") private int retryCount; //创建时间 @Field("create_time") private Date createTime; //创建人 @Field("create_user") private String createUser; //更新时间 @Field("modify_time") private Date modifyTime; //更新人 @Field("modify_user") private String modifyUser; //备注 @Field("remark") private String remark; //扩展信息 @Field("extend_info") private String extendInfo; public String getMailId() { return mailId; } public void setMailId(String mailId) { this.mailId = mailId; } public Long getMailNo() { return mailNo; } public void setMailNo(Long mailNo) { this.mailNo = mailNo; } public String getMailType() { return mailType; } public void setMailType(String mailType) { this.mailType = mailType; } public String getFromAddress() { return fromAddress; } public void setFromAddress(String fromAddress) { this.fromAddress = fromAddress; } public String getToAddress() { return toAddress; } public void setToAddress(String toAddress) { this.toAddress = toAddress; } public String getCcAddress() { return ccAddress; } public void setCcAddress(String ccAddress) { this.ccAddress = ccAddress; } public String getBccAddress() { return bccAddress; } public void setBccAddress(String bccAddress) { this.bccAddress = bccAddress; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getMailBody() { return mailBody; } public void setMailBody(String mailBody) { this.mailBody = mailBody; } public String getSendPriority() { return sendPriority; } public void setSendPriority(String sendPriority) { this.sendPriority = sendPriority; } public String getSendStatus() { return sendStatus; } public void setSendStatus(String sendStatus) { this.sendStatus = sendStatus; } public boolean isHasAttatchment() { return hasAttatchment; } public void setHasAttatchment(boolean hasAttatchment) { this.hasAttatchment = hasAttatchment; } public String[] getAttatchmentUrls() { return attatchmentUrls; } public void setAttatchmentUrls(String[] attatchmentUrls) { this.attatchmentUrls = attatchmentUrls; } public String getClientAppId() { return clientAppId; } public void setClientAppId(String clientAppId) { this.clientAppId = clientAppId; } public boolean isHasDelete() { return hasDelete; } public void setHasDelete(boolean hasDelete) { this.hasDelete = hasDelete; } public int getRetryCount() { return retryCount; } public void setRetryCount(int retryCount) { this.retryCount = retryCount; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public String getCreateUser() { return createUser; } public void setCreateUser(String createUser) { this.createUser = createUser; } public Date getModifyTime() { return modifyTime; } public void setModifyTime(Date modifyTime) { this.modifyTime = modifyTime; } public String getModifyUser() { return modifyUser; } public void setModifyUser(String modifyUser) { this.modifyUser = modifyUser; } public String getRemark() { return remark; } public void setRemark(String remark) { this.remark = remark; } public String getExtendInfo() { return extendInfo; } public void setExtendInfo(String extendInfo) { this.extendInfo = extendInfo; } } MailDO
请大家注意实体上的注解,@Document(collection = "mailinfo")将会在文档数据库中创建一个mailinfo的表,@Id表示指定该字段为主键, @Field("mail_no")表面实体字段mailNo存在MongoDB中的字段名称为mail_no。
根据MongoDB官方文档介绍,如果在插入数据时没有指定主键,MongoDB会自动给插入行自动加上一个主键_id,MongoDB客户端把这个id类型称为ObjectId,看上去就是一个UUID。我们可以通过注解自己设置主键类型,但是根据实践,_id名称是无法改变的。@Id和 @Field("mail_id")表面看上去是我想创建一个mail_id为主键的表,但是实际主键只有_id而没有mail_id。当然,主键的类型不一定非要是UUID,可以是你自己根据业务生成的唯一流水号等等。
同时,还需要注意attatchment_urls这个字段,看上去数组也可以直接存进MongoDB中,毕竟SchemaLess曾经是MongoDB宣传过的比RDBMS最明显的优势之一。
package com.power.demo.apiservice.impl; import com.google.common.collect.Lists; import com.power.demo.apientity.request.BatchSendEmailRequest; import com.power.demo.apientity.response.BatchSendEmailResponse; import com.power.demo.apiservice.contract.MailApiService; import com.power.demo.common.*; import com.power.demo.domain.MailDO; import com.power.demo.entity.vo.MailVO; import com.power.demo.mongodb.MailDao; import com.power.demo.service.contract.MailService; import com.power.demo.util.ConfigUtil; import com.power.demo.util.FastMapperUtil; import com.power.demo.util.SerialNumberUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Arrays; import java.util.Date; import java.util.List; @Component public class MailApiServiceImpl implements MailApiService { @Autowired private MailService mailService; @Autowired private MailDao mailDao; /** * 发送邮件 * * @param request 请求 * @return 发送失败的邮件 **/ public BatchSendEmailResponse batchSendEmail(BatchSendEmailRequest request) { BatchSendEmailResponse response = new BatchSendEmailResponse(); response.setSuccess(""); if (request == null) { response.setFail("请求为空"); } else if (request.getMailList() == null || request.getMailList().size() == 0) { response.setFail("待发送邮件为空"); } if (response.getIsOK() == false) { return response; } List<MailVO> failedMails = Lists.newArrayList();//没有处理成功的邮件 //构造邮件对象 List<MailVO> allMails = generateMails(request); failedMails = processSendMail(allMails); response.setFailMailList(failedMails); response.setSuccess(String.format("发送邮件提交成功,发送失败的记录为:%d", failedMails.size())); return response; } /** * 构造待发送邮件 特殊字段赋值 * * @param request 请求 * @return 发送失败的邮件 **/ private List<MailVO> generateMails(BatchSendEmailRequest request) { List<MailVO> allMails = Lists.newArrayList(); for (MailVO mail : request.getMailList()) { if (mail == null) { continue; } //默认字段赋值 mail.setCreateTime(new Date()); mail.setModifyTime(new Date()); mail.setRetryCount(0); mail.setHasDelete(false); mail.setMailNo(SerialNumberUtil.create()); if (StringUtils.isEmpty(mail.getMailType())) { mail.setMailType(MailType.TEXT.toString()); } else if (Arrays.stream(MailType.values()).filter(x -> x.toString().equalsIgnoreCase(mail.getMailType())).count() == 0) { mail.setMailType(MailType.TEXT.toString()); } if (StringUtils.isEmpty(mail.getSendStatus())) { mail.setSendStatus(SendStatusType.SendWait.toString()); } else if (Arrays.stream(SendStatusType.values()).filter(x -> x.toString().equalsIgnoreCase(mail.getSendStatus())).count() == 0) { mail.setSendStatus(SendStatusType.SendWait.toString()); } if (StringUtils.isEmpty(mail.getSendPriority())) { mail.setSendPriority(SendPriorityType.Normal.toString()); } else if (Arrays.stream(SendPriorityType.values()).filter(x -> x.toString().equalsIgnoreCase(mail.getSendPriority())).count() == 0) { mail.setSendPriority(SendPriorityType.Normal.toString()); } if (StringUtils.isEmpty(mail.getMailId())) { mail.setMailId(String.valueOf(SerialNumberUtil.create())); } if (StringUtils.isEmpty(mail.getFromAddress())) { String fromAddr = ConfigUtil.getConfigVal(AppField.MAIL_SENDER_ADDR); mail.setFromAddress(fromAddr); } allMails.add(mail); } return allMails; } /** * 处理邮件 * * @param allMails 所有邮件 * @return 发送失败的邮件 **/ private List<MailVO> processSendMail(List<MailVO> allMails) { List<MailVO> failedMails = Lists.newArrayList();//没有处理成功的邮件 List<MailVO> asyncMails = Lists.newArrayList();//待异步处理的邮件 for (MailVO mail : allMails) { if (mail.isSync() == false) { //异步处理 continue; } //同步调用 BizResult<String> bizResult = safeSendMail(mail);//发送邮件成功 if (bizResult.getIsOK() == true) { mail.setSendStatus(SendStatusType.SendSuccess.toString()); mail.setRemark("同步发送邮件成功"); } else { mail.setSendStatus(SendStatusType.SendFail.toString()); mail.setRemark(String.format("同步发送邮件失败:%s", bizResult.getMessage())); failedMails.add(mail); } } //批量保存邮件至MongoDB safeStoreMailList(allMails); return failedMails; } /** * 发送邮件 * * @param ent 邮件信息 * @return **/ private BizResult<String> safeSendMail(MailVO ent) { BizResult<String> bizSendResult = null; if (MailType.TEXT.toString().equalsIgnoreCase(ent.getMailType())) { bizSendResult = mailService.sendSimpleMail(ent); } else if (MailType.HTML.toString().equalsIgnoreCase(ent.getMailType())) { bizSendResult = mailService.sendHtmlMail(ent); } if (bizSendResult == null) { bizSendResult = new BizResult<>(false, AppConst.SUCCESS, "不支持的邮件类型"); } return bizSendResult; } /** * 批量保存邮件 * * @param entList 邮件信息列表 * @return **/ private boolean safeStoreMailList(List<MailVO> entList) { boolean isOK = storeMailList(entList); if (isOK == true) { return isOK; } for (int i = 1; i <= AppConst.MAX_RETRY_COUNT; i++) { try { Thread.sleep(100 * i); } catch (Exception te) { te.printStackTrace(); } isOK = storeMailList(entList); if (isOK == true) { break; } } return isOK; } /** * 存储邮件 * * @param entList 邮件信息列表 * @return **/ private boolean storeMailList(List<MailVO> entList) { boolean isOK = false; try { List<MailDO> dbEntList = Lists.newArrayList(); entList.forEach( x -> { MailDO dbEnt = FastMapperUtil.cloneObject(x, MailDO.class); dbEntList.add(dbEnt); } ); mailDao.batchInsert(dbEntList); isOK = true; } catch (Exception e) { e.printStackTrace(); } return isOK; } } MailApiServiceImpl
到这里,MongoDB的主要存储和查询就搞定了。
在上面的邮件接口API实现中,我们定义了邮件发送服务MailService,在Spring Boot中发送邮件也非常简单。
## 邮件配置 spring.mail.host=smtp.xxx.com //邮箱服务器地址 spring.mail.username=abc@xxx.com //用户名 spring.mail.password=123456 //密码 spring.mail.default-encoding=UTF-8 mail.sender.addr=abc@company.com //发送者邮箱 mailsetting
通过Spring的JavaMailSender对象,可以轻松实现邮件发送。
发送简单邮件代码:
/** * 发送简单文本邮件 * * @param ent 邮件信息 **/ public BizResult<String> sendSimpleMail(MailVO ent) { BizResult<String> bizResult = new BizResult<>(true, AppConst.SUCCESS); try { if (ent == null) { bizResult.setFail("邮件信息为空"); return bizResult; } if (StringUtils.isEmpty(ent.getToAddress())) { bizResult.setFail("简单邮件,接收人邮箱为空"); return bizResult; } //默认发件人设置 if (StringUtils.isEmpty(ent.getFromAddress())) { ent.setFromAddress(senderAddr); } SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(ent.getFromAddress()); message.setTo(ent.getToAddress()); message.setCc(ent.getCcAddress()); message.setBcc(ent.getBccAddress()); message.setSubject(ent.getSubject()); message.setText(ent.getMailBody()); message.setSentDate(new Date()); mailSender.send(message); bizResult.setSuccess("简单邮件已经发送"); } catch (Exception e) { e.printStackTrace(); PowerLogger.error(String.format("发送简单邮件时发生异常:%s", e)); bizResult.setFail(String.format("发送简单邮件时发生异常:%s", e)); } finally { PowerLogger.info(String.format("简单邮件,发送结果:%s", SerializeUtil.Serialize(bizResult))); } return bizResult; } sendSimpleMail
同理,我们经常要发送带格式的HTML邮件,发送代码可以参考如下:
/** * 发送HTML邮件 * * @param ent 邮件信息 **/ public BizResult<String> sendHtmlMail(MailVO ent) { BizResult<String> bizResult = new BizResult<>(true, AppConst.SUCCESS); try { if (ent == null) { bizResult.setFail("邮件信息为空"); return bizResult; } if (StringUtils.isEmpty(ent.getToAddress())) { bizResult.setFail("HTML邮件,接收人邮箱为空"); return bizResult; } //默认发件人设置 if (StringUtils.isEmpty(ent.getFromAddress())) { ent.setFromAddress(senderAddr); } MimeMessage message = mailSender.createMimeMessage(); //true表示需要创建一个multipart message MimeMessageHelper helper = new MimeMessageHelper(message, true); helper.setFrom(ent.getFromAddress()); helper.setTo(ent.getToAddress()); helper.setCc(ent.getCcAddress()); helper.setBcc(ent.getBccAddress()); helper.setSubject(ent.getSubject()); helper.setText(ent.getMailBody(), true);//true表示是html邮件 helper.setSentDate(new Date()); //判断有无附件 循环添加附件 if (ent.isHasAttatchment() && ent.getAttatchmentUrls() != null) { for (String filePath : ent.getAttatchmentUrls()) { FileSystemResource file = new FileSystemResource(new File(filePath)); String fileName = filePath.substring(filePath.lastIndexOf(File.separator)); helper.addAttachment(fileName, file); } } mailSender.send(message); bizResult.setSuccess("HTML邮件已经发送"); } catch (Exception e) { e.printStackTrace(); PowerLogger.error(String.format("发送HTML邮件时发生异常:%s", e)); bizResult.setFail(String.format("发送HTML邮件时发生异常:%s", e)); } finally { PowerLogger.info(String.format("HTML邮件,发送结果:%s", SerializeUtil.Serialize(bizResult))); } return bizResult; } sendHtmlMail
邮件附件的处理,本文仅仅是简单示例,实际情况是通常都免不了要上传分布式文件系统,如FastDFS等,有空我会继续写一下Spring Boot和分布式文件系统的应用实践。
还记得上一篇文章里的定时任务发送邮件吗?贴一下MailServiceImpl下的补偿发送实现:
/** * 自动查询并发送邮件 * * @param startTime 开始时间 * @param endTime 结束时间 * @return **/ public void autoSend(Date startTime, Date endTime) { StopWatch watch = DateTimeUtil.StartNew(); List<MailDO> mailDOList = mailDao.findToSendList(startTime, endTime); for (MailDO dbEnt : mailDOList) { MailVO ent = FastMapperUtil.cloneObject(dbEnt, MailVO.class); BizResult<String> bizSendResult = null; if (MailType.TEXT.toString().equalsIgnoreCase(ent.getMailType())) { bizSendResult = sendSimpleMail(ent); } else if (MailType.HTML.toString().equalsIgnoreCase(ent.getMailType())) { bizSendResult = sendHtmlMail(ent); } if (bizSendResult == null) { bizSendResult = new BizResult<>(false, AppConst.SUCCESS, "不支持的邮件类型"); } if (bizSendResult.getIsOK() == true) { dbEnt.setSendStatus(SendStatusType.SendSuccess.toString()); } else { dbEnt.setSendStatus(SendStatusType.SendFail.toString()); } dbEnt.setRetryCount(dbEnt.getRetryCount() + 1);//重试次数+1 dbEnt.setRemark(SerializeUtil.Serialize(bizSendResult)); dbEnt.setModifyTime(new Date()); dbEnt.setModifyUser("QuartMailTask"); mailDao.update(dbEnt); } watch.stop(); PowerLogger.info(String.format("本次共处理记录数:%s,总耗时:%s", mailDOList.size(), watch.getTotalTimeMillis())); } 自动查询并发送邮件
这里贴出来的示例代码是线性的一个一个发送邮件,我们完全可以改造成多线程的并行处理方式来提升邮件发送处理能力。
MongoDB的默认最大连接数是100,不同的客户端有不同的实现,对于读多写多的应用,最大连接数可能成为瓶颈。
不过设置最大连接数也要注意内存开销,合理配置连接池maxPoolSize。
早期版本的MongoDB已经支持行级的事务,支持简单的行级操作原子性,单行的操作要么全部成功,要么全部失败。
MongoDB的 WiredTiger 引擎本身支持事务,官方在最新版本中,号称完全支持ACID和事务。
可以选取合适字段创建索引,和RDBMS一样,MongoDB的索引也有很多种,如:单字段索引、复合索引、多Key索引、哈希索引等。
在常见的查询字段上合理添加索引,或者定期归档数据,减少查询数据量,这些手段都可以有效提高查询速度。
还有一种非常常见的手段就是Sharding,也就是数据库分片技术。当数据量比较大的时候,我们需要把数据分片运行在不同的机器中,以降低CPU、内存和IO的压力。MongoDB分片技术类似MySQL的水平切分和垂直切分,主要由两种方式做Sharding:垂直扩展和横向切分。垂直扩展的方式就是进行集群扩展,添加更多的CPU,内存,磁盘空间等。横向切分则是通过数据分片的方式,通过集群统一提供服务。
真正的高可用系统,很少有单实例的应用形态存在。
MongoDB支持主从方式、双机双工方式(互备互援)和集群工作方式(多服务器互备方式),减少单点出故障的可能。
参考:
<<MongoDB权威指南>>
https://docs.mongodb.com/
http://www.runoob.com/mongodb/mongodb-tutorial.html
https://www.cnblogs.com/binyue/p/5901328.html
https://yq.aliyun.com/articles/33726
https://yq.aliyun.com/articles/66623
http://www.cnblogs.com/l1pe1/p/7871790.html