在进行微服务架构研发的过程中,不少研发人员都提出配置热加载的需求,一方面是为了提升研发效率,开发人员修改配置不需要重启服务;另一方面是为了线上紧急事件,可以快速恢复,例如线上数据库 down 了,可以紧急启动降级开关,通过配置热加载动态生效,降低处理事件的时间。于是我采用 rocketmq 消息队列,实现一套解耦的配置热加载机制。这里我将思路简单介绍下,希望大家能通过这篇文章,实现自己的配置热加载。
配置中心修改配置后,将修改的配置发送到 rocketmq。
微服务启动时,监控配置修改的topic,发现自身配置已修改,则发起配置热加载。
服务需要设置热加载开关,配置误修改可能带来风险。
1.1 RocketMQ消息设计
topic | tag | body |
---|---|---|
confCenter | confModify | 服务名称和配置内容 |
1.2 采用spring的event机制处理消息消息消费
RocketmqEvent 代码如下:
public class RocketmqEvent extends ApplicationEvent { private static final long serialVersionUID = -4468405250074063206L; private DefaultMQPushConsumer consumer; private List<MessageExt> msgs; public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception { super(msgs); this.consumer = consumer; this.setMsgs(msgs); } public String getMsg(int idx) { try { return new String(getMsgs().get(idx).getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { return null; } } public String getMsg(int idx,String code) { try { return new String(getMsgs().get(idx).getBody(), code); } catch (UnsupportedEncodingException e) { return null; } } public DefaultMQPushConsumer getConsumer() { return consumer; } public void setConsumer(DefaultMQPushConsumer consumer) { this.consumer = consumer; } public MessageExt getMessageExt(int idx) { return getMsgs().get(idx); } public String getTopic(int idx) { return getMsgs().get(idx).getTopic(); } public String getTag(int idx) { return getMsgs().get(idx).getTags(); } public byte[] getBody(int idx) { return getMsgs().get(idx).getBody(); } public String getKeys(int idx) { return getMsgs().get(idx).getKeys(); } public List<MessageExt> getMsgs() { return msgs; } public void setMsgs(List<MessageExt> msgs) { this.msgs = msgs; } }
1.3 RocketMQ的message的Event处理
@Autowired ApplicationEventPublisher publisher publisher.publishEvent(new RocketmqEvent(msgs, consumer));
微服务端收到消息后,采用spring的RefreshScope机制进行配置热加载
1、需要注入 RefreshScope 和 ConfigurableWebApplicationContext 两个对象。
ConfigurableWebApplicationContext applicationContext;//采用ApplicationContextAware机制进行注入 @Autowired private RefreshScope refreshScope = null;
2、解析 RocketMQ 的 message,然后进行配置刷新。
@EventListener public void envListener(RocketmqEvent event) { event.getMsgs().forEach(msg -> { String body = new String(msg.getBody()); JSONObject json = JSON.parseObject(body); MutablePropertySources target = applicationContext.getEnvironment().getPropertySources();//获取Sources for (PropertySource<?> source : target) { if ("defaultProperties".equals(source.getName())) {//对默认配置进行刷新 @SuppressWarnings("unchecked") Map<String, String> map = (Map<String, String>) source.getSource(); Map<String, String> after = JSONObject.parseObject(json.toJSONString(), new TypeReference<Map<String, String>>() { }); map.putAll(after); PropertiesContent.refleshAll(after); this.refreshScope.refreshAll(); return; } } }); }
最后,这里只是简单介绍了配置热加载的思路,当然还有很多细节需要思考,如果有兴趣的可以一起交流交流。
作者简介:邓启翔,国信证券 zebra 微服务技术负责人,具有多年网络框架和分布式系统架构设计及研发经验,对消息中间件具有深刻理解。曾就职于平安、汇添富基金,资深架构师。