本文主要研究一下nacos config的publishConfig
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
@Controller @RequestMapping(Constants.CONFIG_CONTROLLER_PATH) public class ConfigController { private static final Logger log = LoggerFactory.getLogger(ConfigController.class); private static final String NAMESPACE_PUBLIC_KEY = "public"; public static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_"; public static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip"; public static final String EXPORT_CONFIG_FILE_NAME_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; private final transient ConfigServletInner inner; private final transient PersistService persistService; private final transient ConfigSubService configSubService; @Autowired public ConfigController(ConfigServletInner configServletInner, PersistService persistService, ConfigSubService configSubService) { this.inner = configServletInner; this.persistService = persistService; this.configSubService = configSubService; } /** * 增加或更新非聚合数据。 * * @throws NacosException */ @RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam("content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { final String srcIp = RequestUtil.getRemoteIp(request); String requestIpApp = RequestUtil.getAppName(request); ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); if (configTags != null) { configAdvanceInfo.put("config_tags", configTags); } if (desc != null) { configAdvanceInfo.put("desc", desc); } if (use != null) { configAdvanceInfo.put("use", use); } if (effect != null) { configAdvanceInfo.put("effect", effect); } if (type != null) { configAdvanceInfo.put("type", type); } if (schema != null) { configAdvanceInfo.put("schema", schema); } ParamUtils.checkParam(configAdvanceInfo); if (AggrWhitelist.isAggrDataId(dataId)) { log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group); throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr"); } final Timestamp time = TimeUtils.getCurrentTime(); String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; } //...... }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/PersistService.java
@Repository public class PersistService { @Autowired private DynamicDataSource dynamicDataSource; private DataSourceService dataSourceService; //...... /** * 写入主表,插入或更新 */ public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time, Map<String, Object> configAdvanceInfo, boolean notify) { try { addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify); } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突 updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify); } } public void insertOrUpdateTag(final ConfigInfo configInfo, final String tag, final String srcIp, final String srcUser, final Timestamp time, final boolean notify) { try { addConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify); } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突 updateConfigInfo4Tag(configInfo, tag, srcIp, null, time, notify); } } public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp, final String srcUser, final Timestamp time, final boolean notify) { try { addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify); } catch (DataIntegrityViolationException ive) { // 唯一性约束冲突 updateConfigInfo4Beta(configInfo, srcIp, null, time, notify); } } //...... /** * 添加普通配置信息,发布数据变更事件 */ public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) { tjt.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try { long configId = addConfigInfoAtomic(srcIp, srcUser, configInfo, time, configAdvanceInfo); String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags"); addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I"); if (notify) { EventDispatcher.fireEvent( new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant(), time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } return Boolean.TRUE; } }); } /** * 更新配置信息 */ public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) { tjt.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus status) { try { ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); String appNameTmp = oldConfigInfo.getAppName(); // 用户传过来的appName不为空,则用持久化用户的appName,否则用db的;清空appName的时候需要传空串 if (configInfo.getAppName() == null) { configInfo.setAppName(appNameTmp); } updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo); String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags"); if (configTags != null) { // 删除所有tag,然后再重新创建 removeTagByIdAtomic(oldConfigInfo.getId()); addConfiTagsRelationAtomic(oldConfigInfo.getId(), configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant()); } insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U"); if (notify) { EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant(), time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } return Boolean.TRUE; } }); } /** * 添加普通配置信息,发布数据变更事件 */ public void addConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time, boolean notify) { String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName(); String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant(); String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim(); try { String md5 = MD5.getInstance().getMD5String(configInfo.getContent()); jt.update( "INSERT INTO config_info_tag(data_id,group_id,tenant_id,tag_id,app_name,content,md5,src_ip,src_user," + "gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)", configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, appNameTmp, configInfo.getContent(), md5, srcIp, srcUser, time, time); if (notify) { EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } } /** * 更新配置信息 */ public void updateConfigInfo4Tag(ConfigInfo configInfo, String tag, String srcIp, String srcUser, Timestamp time, boolean notify) { String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName(); String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant(); String tagTmp = StringUtils.isBlank(tag) ? StringUtils.EMPTY : tag.trim(); try { String md5 = MD5.getInstance().getMD5String(configInfo.getContent()); jt.update( "UPDATE config_info_tag SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE " + "data_id=? AND group_id=? AND tenant_id=? AND tag_id=?", configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp); if (notify) { EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, tagTmp, time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } } /** * 添加普通配置信息,发布数据变更事件 */ public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps, String srcIp, String srcUser, Timestamp time, boolean notify) { String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName(); String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant(); try { String md5 = MD5.getInstance().getMD5String(configInfo.getContent()); jt.update( "INSERT INTO config_info_beta(data_id,group_id,tenant_id,app_name,content,md5,beta_ips,src_ip," + "src_user,gmt_create,gmt_modified) VALUES(?,?,?,?,?,?,?,?,?,?,?)", configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5, betaIps, srcIp, srcUser, time, time); if (notify) { EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } } /** * 更新配置信息 */ public void updateConfigInfo4Beta(ConfigInfo configInfo, String srcIp, String srcUser, Timestamp time, boolean notify) { String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName(); String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant(); try { String md5 = MD5.getInstance().getMD5String(configInfo.getContent()); jt.update( "UPDATE config_info_beta SET content=?, md5 = ?, src_ip=?,src_user=?,gmt_modified=?,app_name=? WHERE " + "data_id=? AND group_id=? AND tenant_id=?", configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp, configInfo.getDataId(), configInfo.getGroup(), tenantTmp); if (notify) { EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, time.getTime())); } } catch (CannotGetJdbcConnectionException e) { fatalLog.error("[db-error] " + e.toString(), e); throw e; } } //...... }