转载

Koa-spring: Node进程通信的实践

关于这篇文章

  • Node多进程
  • 进程通信
  • 闭包与立即执行

如果你感兴趣,可以fork项目,自己体验一下

Koa-spring : https://github.com/closertb/k...

related-client : https://github.com/closertb/k...

技术栈:koa + Sequelize + routing-controllers + typescript

Node多进程

进程与线程

  • 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器(来自百科),进程是 资源分配 的最小单位。每个进程都拥有自己的独立空间地址、数据栈,一个进程无法访问另外一个进程里定义的变量、数据结构,只有建立了 IPC 通信,进程之间才可数据共享。
  • 线程:线程是操作系统能够进行 运算调度 的最小单位,首先我们要清楚线程是隶属于进程的,被包含于进程之中。一个线程只能隶属于一个进程,但是一个进程是可以拥有多个线程的。

这里只立个概念,有很多文章是讲Node的进程与线程的,这里推荐(很多文章年代久远,但都是精华):

  • Node.js 探秘:初识单线程的 Node.js
  • 当我们谈论 cluster 时我们在谈论什么(上)
  • 当我们谈论 cluster 时我们在谈论什么(下)
  • 另外就是推荐朴灵的:深入浅出NodeJs第九章

开始前,先强调三个点:

开启多进程不是为了解决高并发,主要是解决了单进程单线程模式下 Node.js CPU 利用率不足的情况,充分利用多核 CPU 的性能。

进程与进程之间内存是隔离的,所以数据不能共享,所以需要通信;

进程之间的通信是异步的;

进程通信

需求是这样的,在第一篇提到过鉴权中间件,系统登录后,我需要缓存用户的登录状态,下一次用户发起请求,根据缓存直接校验token是否有效,简单粗暴,依赖的库是memory-cache,没有上高大上的Redis。开始时我的系统是单进程的,操作简单:

  1. 用户登录时,缓存cache,cache.put(id, user, expirations);
  2. 鉴权时,根据请求携带的id和token, 校验是否和缓存中缓存的一致:token === cache.get(id).token;

简单粗暴高效,直到我为了程序更健壮,用cluster模式开启了多进程,显然,上面的方案不再奏效,进程与进程间的cache是分别存在不同的内存块的,所以,只有换方案,仍然没有上redis,而是考虑用进程间的通信来解决。下面是一个简单的示意图:

Koa-spring: Node进程通信的实践

简单来讲,就是利用主进程与工作进程之间的IPC通道进行通信,登录成功后,工作进程将鉴权信息发送给主进程,主进程进行存储。下一次请求发起鉴权时,工作进程给主进程发送一个读取缓存的消息,并开启监听;主进程监听到这个消息,并读取缓存中的鉴权信息,然后再发送给工作进程;工作进程收到后,判断携带的token是否和主进程缓存的token一致,一致则鉴权通过,部分示例代码:

// index.ts
const worker = cluster.fork();
//监听message事件
worker.on('message', (msg: ActionBody) => {
  const { type, payload } = msg;
  if(type == 'readCache') {
    const { uid, id } = payload;
    const res = cache.get(id) || {};
    worker.send({ type: 'sendCache', uid, payload: res })
  }
  if(type == 'saveCache') {
    cache.put(payload.id, payload, ExpiredTime);
  }
});

// userController.ts, 发起缓存鉴权信息消息
process.send({ type: 'saveCache', payload: res });

// AuthCheckMiddleWare.ts
function readCache(id: string) {
  return new Promise((res, rej) => {
    process.on('message', (m) = {
      if (m.uid === id) {
          res(m.payload);
      } else {
        rej({});
      }
    });
    process.send({ type: 'readCache', payload: { id, uid: id }});
  });
}


const user: any = await readCache(uid);
if(user && user.token === token) {
    ctx.user = user;
    await next();
} else {
  ctx.body = {
    code: 120001,
    message: uid ? '登录超时,请重新登录' : '请先登录',
    status: 'error'
  };
}

上面展示的是一个简易版的进程通信,在低并发请求时,运行是没有问题的。但如果你是老江湖,就会发现很多bug。

闭包与立即执行函数

上一节展示的代码,有多少bug呢?简单列举一下:

  • process.on重复订阅,即每发起一次鉴权,就会添加一次订阅。和浏览器的监听一样,如果重复去订阅监听,那就会重复响应;
  • 由于通信是异步的,所以发起的订阅在同一时间,并发量高一点,就会存在响应堆积,所以维护一个响应队列才是更好的做法;
  • 存在reject的情形,程序并没有相应的错误处理;
  • 会不会存在主进程一直未响应或工作进程未收到响应的情况?

所以基于以上问题,对readCache函数作了如下改动:

type Callback = (m: ActionBody) => boolean;

function generateUid() {
  const random = Math.floor(26 * Math.random() + 65);
  return `${Date.now()}-${String.fromCharCode(random)}`;
}

// 回调队列
const callbackList: Array<Callback> = [];

process.on('message', (m: ActionBody = { type: 'sendCache' }) => {
  let tempList = callbackList.slice();
  tempList.forEach((callback, i) => {
    callback = tempList[i];
    if (callback && callback(m)) {
      callbackList.splice(i, 1); // 成功处理了响应的或则响应已过期,移除这个回调;
    }
  });
});

function addCallback(callback: Callback) {
  callbackList.push(callback);
}

function readCache(id: string) {
  return new Promise((res, rej) => {
    try {
      const uid = generateUid();
      addCallback(((uid: string) => {
        let status = false;
        let timeout = setTimeout(() => { // 5秒超时读取,防止永久未回调,导致回调一直存在于回调列表中: 可能性很小
          rej({ message: '授权验证超时' });
          status = true;
        }, 5000);
        return (m: ActionBody) => {
        // 必须在过期前响应
        if (!status && m.uid === uid) {
          clearTimeout(timeout);  
          res(m.payload);
          return true;
        }
        return status;
      }
      })(uid));
      process.send({ type: 'readCache', payload: { id, uid }});
    } catch (error) {
      rej(error);
    }
  });
}

可以看到,针对上面提到的问题,作了如下几方面的改进:

  • 维护了一个callbackList响应回调列表;
  • 每次收到响应,对回调列表一一执行,响应成功的,从回调列表删除;
  • 生成唯一的uid,保证消息的发送与响应一一对应;
  • 利用setTimeout逻辑,来处理超时响应;

通信的优化就讲完了,但为什么和章节标题闭包与立即执行半毛钱关系没有。是有的,我又在闭包上跌了一跤,为了长记性,我故意用了这样一个标题。事情经过是这样的,开始添加回调函数不是上面那样写的,而是下面这样:

try {
  const uid = generateUid();
  addCallback((m: ActionBody) => {
    // 必须在过期前响应
    if (m.uid === uid) {
      res(m.payload);
      return true;
    }
    return false;
  }
  });
  process.send({ type: 'readCache', payload: { id, uid }});
}

看起来没什么毛病,无并发运行起来也没毛病。但当我加大并发量时,就会偶尔报权限错误,最后一排查,发现是m.uid === uid这一行的uid与期望的不一致,被篡改成了下一次请求生成的uid(抱头思考N分钟,哦,原来,闭包的锅)。解决闭包最好的方法是什么,立即执行函数,然后才有了上面的写法。如果对还原这个过程有兴趣,可以在主进程的响应里,添加一个500ms的延迟。

至此,我的这一次前端向Node服务端的探索告一段落,但好多想做的还没实现。愿下一次机会来临时,有能力让Graphql在我的服务中落地,用了Github的API V4后, 发现Graphql对架构要求极高,这能力怎么学,还得倒腾,倒腾。

系列索引

上篇: Koa-spring:后端太忙,让我自己写服务(上)

下篇: Koa-spring:后端太忙,让我自己写服务(下)

原文  https://segmentfault.com/a/1190000021343992
正文到此结束
Loading...