最近想希望通过Node将目前由Java实现的例如邮件发送,短信发送,消息通知,数据计算结构到Node中
将Node整合为服务中台,这就碰到一个问题,如何将结构的服务集成到微服务的体系中
微服务体系中核心节点,为服务拆分提供了有效的支撑
常见的有zookeeper(CP),eureka(AP),nacos(AP/CP)
CAP原则又称CAP定理,指的是在一个分布式系统中, 一致性 (Consistency)、 可用性 (Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾
Node环境下将服务向注册中心注册还是比较简单的,引入官方依赖生成实例即可
/** * eureka注册模块 */ 'use script' const Eureka = require('eureka-js-client').Eureka module.exports = new Eureka({ logger: global.LOGGER, instance: { app: global.APPLICATION_CONFIG.application.name, // 服务名称 app hostName: global.APPLICATION_CONFIG.application.host, // 请求地址 localhost instanceId: `${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}`, // localhost:9000 ipAddr: global.APPLICATION_CONFIG.application.host, // IP地址 statusPageUrl: `http://${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}/status`, // status port: { // 端口, 必须这个格式 $: global.RUNTIME.port, '@enabled': 'true' }, // 向eureka注册的服务名 feign调用时使用 app vipAddress: global.APPLICATION_CONFIG.application.name, // 本地搭建使用MyOwn且指定class dataCenterInfo: { '@class': 'com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo', // 缺少会导致404错误 name: 'MyOwn' // 'Netflix' | 'Amazon' | 'MyOwn' } }, eureka: { // 如果开启了auth 需要加入用户名和密码 host: global.APPLICATION_CONFIG.eureka.host, // eureka地址,多个使用逗号分隔 port: global.APPLICATION_CONFIG.eureka.port, // 端口号 servicePath: global.APPLICATION_CONFIG.eureka.servicePath // 如果eureka没有更改过则是默认的 /eureka/apps } }) 复制代码
/** * nacos 注册服务 * 向nacos注册,订阅配置推送 * 通过events模块通知配置更新 */ 'use script' const { NacosNamingClient, NacosConfigClient } = require('nacos') const yaml = require('yamljs') const logger = global.LOGGER // 新建客户端 const client = new NacosNamingClient({ logger, serverList: global.APPLICATION_CONFIG.nacos['service-list'], namespace: global.APPLICATION_CONFIG.nacos.namespace }) client.ready() // 向nocos实例注册 client.registerInstance(global.APPLICATION_CONFIG.application.name, { ip: global.APPLICATION_CONFIG.application.host, port: global.RUNTIME.port }, global.APPLICATION_CONFIG.nacos.namespace) // 新建配置实例,接收推送 const config = new NacosConfigClient({ serverAddr: global.APPLICATION_CONFIG.nacos['service-list'], namespace: global.APPLICATION_CONFIG.nacos.namespace }) config.subscribe({ dataId: 'application-mercury-dev.yml', group: global.APPLICATION_CONFIG.nacos.namespace }, content => { global.LOGGER.info('<= Publish Config Received') const remoteConfig = yaml.parse(content) global.LOGGER.info(`=>${JSON.stringify(remoteConfig)}`) // 覆盖当前配置 Object.assign(global.APPLICATION_CONFIG, remoteConfig) // 推送 global.EVENT_BUS.emit('resource-update') }) exports.client = client exports.config = config 复制代码
消息队列当然也要整合,http调用肯定不能满足高并发,高负载的场景。MQ可以有效的提供缓冲与解构,贴一下RabbitMQ的实现
需要注意的事,接收到消息不一定只有单个内部模块使用,所以也需要考虑内部的订阅发布
/** * rabbit-mq 客户端封装 */ 'use script' const amqp = require('amqplib') module.exports = class RabbitMQ { /** * 构造器, 开启rabbit-mq的连接 * @param host * @param port * @param user * @param pass */ constructor (host = 'localhost', port = 5672, user = '***', pass = '***') { const self = this self.hosts = host self.consumer = [] amqp.connect({ hostname: host, port: port, username: user, password: pass }).then(conn => { global.LOGGER.info('<= RabbitMQ Connected') self.connect = conn self.connect.createChannel().then(channel => { global.LOGGER.info('<= Chanel Created') self.channel = channel self._consume() }) }).catch(e => { global.LOGGER.error(`<= RabbitMQ Connect Error ${e}`) }) self.subscriber = {} } /** * 添加消息队列监听 * @private */ _consume () { const distribute = (message) => { // 防止重复消费, 程序的错误将记录日志 try { const content = message.content.toString() global.LOGGER.info(`<= Recive Queue [${message.fields.routingKey}] Message ${content}`) this.subscriber[message.fields.routingKey].forEach(callback => { callback(JSON.parse(content)) }) } catch (e) { global.LOGGER.error(e) } finally { // 接受消息并且确认 this.channel.ack(message) } } this.consumer.forEach(consumer => { if (!this.subscriber[consumer.topic]) { this.channel.consume(consumer.topic, distribute) this.subscriber[consumer.topic] = [] } this.subscriber[consumer.topic].push(consumer.callback) }) } /** * 消费 * @param topic * @param callback */ subscribe (topic, callback) { this.consumer.push({ topic: topic, callback: callback }) } /** * 推送 * @param topic * @param message */ publish (topic, message) { const stringMessage = JSON.stringify(message) this.channel.sendToQueue(topic, Buffer.from(stringMessage)) global.LOGGER.info(`=> Publish Message ${stringMessage} `) } /** * 获取通信管道 * @returns {any} */ connection () { return this.channel } } 复制代码
如果希望能够让其他服务或者系统 快乐
的调用,还是需要提供SDK的,提供SDK的好处有
开箱即用,各种场景已经考虑到,并且依赖也已经整合
完善的测试,为上面的有点提供支持
entity
public class Mail { /** 发件人 */ private String from; /** 收件人 */ private String to; /** 抄送 */ private String cc; /** 秘抄 */ private String bcc; /** 主题 */ private String subject; /** 正文 */ private String text; /** HTML正文 */ private String html; public Mail() { } public Mail(String from, String to) { this.from = from; this.to = to; } public Mail(String from, String to, String subject) { this.from = from; this.to = to; this.subject = subject; } public Mail(String from, String to, String cc, String subject, String text) { this.from = from; this.to = to; this.cc = cc; this.subject = subject; this.text = text; } ... } 复制代码
@FeignClient(name = "thunder-mercury") public interface MercuryService extends IService { /** * 发送邮件 * * @param mail 邮件实体 * @return {@link BaseResponse} */ @PostMapping(value = "/v1/mail/") BaseResponse<String> send(@RequestBody Mail mail); ... } 复制代码