使用setImmediate或多进程解决Node.js的CPU-bound任务

@xingbofeng 2018-01-11 07:46:11发表于 xingbofeng/xingbofeng.github.io JavaScriptNode.js

本文配有demo,猛戳此处访问demo

大量占用CPU计算资源的任务称为CPU-bound的任务。

它的主要特点是CPU利用率较高,而不是I/O操作繁重。 让我们立即举一个例子上看看这些类型的任务在Node.js中的具体行为。

暴力求解子集求和问题

问题:类似于Leetcode-40. Combination Sum II

Given a collection of candidate numbers (C) and a target number (T), find all unique combinations in C where the candidate numbers sums to T.

Each number in C may only be used once in the combination.

Example:

For example, given candidate set [10, 1, 2, 7, 6, 1, 5] and target 8, A solution set is:

[
  [1, 7],
  [1, 2, 5],
  [2, 6],
  [1, 1, 6]
]

暴力算法就是一个递归算法,这里主要探究Node.js上的解决方案,就不去探究算法本身。

构建Demo本身

我们使用EventEmitter来构建这个demo:

"use strict";

const EventEmitter = require('events').EventEmitter;

class SubsetSum extends EventEmitter {
  /**
   * 构造函数,创建SubsetSum时调用
   * @param  {Number} sum 一个整数,期望的求和的整数
   * @param  {Array}  set 一个集合,被求解的集合
   */
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }

  /**
   * 递归地生成每一个可能的子集,而不把CPU控制权交还给事件循环。大量消耗CPU资源
   * @param  {Array} set    主集合
   * @param  {Array} subset 子集
   */
  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      this._combine(set.slice(i + 1), newSubset);
      this._processSubset(newSubset);
    }
  }

  /**
   * 核对子集是否符合要求,一旦匹配到,则发出match事件
   * @param  {Array} subset 子集
   */
  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => (prev + item), 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  /**
   * 开始进行匹配,匹配完毕之后,发出end事件
   */
  start() {
    this._combine(this.set, []);
    this.emit('end');
  }
}

module.exports = SubsetSum;

为了说明CPU-bound任务造成的问题,创建一个HTTP服务器,对于网络请求作出响应:

"use strict";

const http = require('http');
const SubsetSum = require('./subsetSum');

http.createServer((req, res) => {
  const url = require('url').parse(req.url, true);
  if(url.pathname === '/subsetSum') {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on('match', match => {
      res.write('Match: ' + JSON.stringify(match) + '\n');
    });
    subsetSum.on('end', () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end('I\m alive!\n');
  }
}).listen(8000, () => console.log('Started'));

由于SubsetSum实例使用事件返回结果,所以我们可以在算法生成后立即对匹配的结果使用Stream进行处理。另一个需要注意的细节是,每次我们的服务器都会返回I'm alive!,这样我们每次发送一个不同于/subsetSum的请求的时候。可以用来检查我们服务器是否挂掉了,这在稍后将会看到。

开始运行:

node app

一旦服务器启动,我们准备发送我们的第一个请求;让我们尝试发送一组17个随机数,服务器将会一直处于阻塞状态的计算中,那么将会导致服务器处理一段时间:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"

这是如果我们在第一个请求仍在运行的时候在另一个终端中尝试输入以下命令,我们将发现一个巨大的问题:

curl -G http://localhost:8000

方案一:使用setImmediate

现在我们来看看这个模式如何应用于子集求和算法。 我们所要做的只是稍微修改一下subsetSum.js模块。 为方便起见,我们将创建一个名为subsetSumDefer.js的新模块,将原始的subsetSum类的代码作为起点。
我们要做的第一个改变是添加一个名为_combineInterleaved()的新方法,它是我们正在实现的模式的核心:

_combineInterleaved(set, subset) {
  this.runningCombine++;
  setImmediate(() => {
    this._combine(set, subset);
    if(--this.runningCombine === 0) {
      this.emit('end');
    }
  });
}

正如我们所看到的,我们所要做的只是使用setImmediate()调用原始的同步的_combine()方法。然而,现在的问题是因为该算法不再是同步的,我们更难以知道何时已经完成了所有的组合的计算。

为了解决这个问题,我们必须使用非常类似于我们在Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的异步并行执行的模式来追溯_combine()方法的所有正在运行的实例。 当_combine()方法的所有实例都已经完成运行时,触发end事件,通知任何监听器,进程需要做的所有动作都已经完成。

对于最终子集求和算法的重构版本。首先,我们需要将_combine()方法中的递归步骤替换为异步:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combineInterleaved(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}

通过上面的更改,我们确保算法的每个步骤都将使用setImmediate()在事件循环中排队,在事件循环队列中I / O请求之后执行,而不是同步运行造成阻塞。

另一个小调整是对于start()方法:

start() {
  this.runningCombine = 0;
  this._combineInterleaved(this.set, []);
}

在前面的代码中,我们将_combine()方法的运行实例的数量初始化为0.我们还通过调用_combineInterleaved()来将调用替换为_combine(),并移除了end的触发,因为现在_combineInterleaved()是异步处理的。
通过这个最后的改变,我们的子集求和算法现在应该能够通过事件循环可以运行的时间间隔交替地运行其可能大量占用CPU的代码,并且不会再造成阻塞。

最后更新app.js模块,以便它可以使用新版本的SubsetSum

const http = require('http');
// const SubsetSum = require('./subsetSum');
const SubsetSum = require('./subsetSumDefer');
http.createServer(function(req, res) {
  // ...
})
"use strict";

const EventEmitter = require('events').EventEmitter;

class SubsetSumDefer extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
    this.totalSubsets = 0;
  }

  /**
   * 增添一个_combineInterleaved方法,把计算过程放到setImmediate异步处理
   * @param  {Array} set
   * @param  {Array} subset
   */
  _combineInterleaved(set, subset) {
    this.runningCombine++;
    setImmediate(() => {
      this._combine(set, subset);
      if(--this.runningCombine === 0) {
        this.emit('end');
      }
    });
  }

  _combine(set, subset) {
    for(let i = 0; i < set.length; i++) {
      let newSubset = subset.concat(set[i]);
      // 替换到setImmediate
      this._combineInterleaved(set.slice(i + 1), newSubset);
      this._processSubset(newSubset);
    }
  }

  _processSubset(subset) {
    console.log('Subset', ++this.totalSubsets, subset);
    const res = subset.reduce((prev, item) => prev + item, 0);
    if(res == this.sum) {
      this.emit('match', subset);
    }
  }

  start() {
    // 设定一个计数器,计数器又归零的时候认为结束计算过程
    this.runningCombine = 0;
    this._combineInterleaved(this.set, []);
  }
}

module.exports = SubsetSumDefer;

方案二:实现一个进程池

先从构建processPool.js模块开始:

const fork = require('child_process').fork;
class ProcessPool {
  constructor(file, poolMax) {
      this.file = file;
      this.poolMax = poolMax; // 进程池最大进程数量
      this.pool = []; // 准备运行的进程
      this.active = []; // 正在运行的进程列表,比poolMax数量少
      this.waiting = []; // 任务队列
    } //...
}

在模块的第一部分,引入我们将用来创建新进程的child_process.fork()函数。 然后,我们定义ProcessPool的构造函数,该构造函数接受表示要运行的Node.js程序的文件参数以及池中运行的最大实例数poolMax作为参数。然后我们定义三个实例变量:

  • pool表示的是准备运行的进程
  • active表示的是当前正在运行的进程列表
  • waiting包含所有这些请求的任务队列,保存由于缺少可用的资源而无法立即实现的任务

ProcessPool类的acquire()方法,它负责取出一个准备好被使用的进程:

acquire(callback) {
  let worker;
  if(this.pool.length > 0) {  // [1]
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }

  if(this.active.length >= this.poolMax) {  // [2]
    return this.waiting.push(callback);
  }

  worker = fork(this.file);  // [3]
  this.active.push(worker);
  process.nextTick(callback.bind(null, null, worker));
}

函数逻辑如下:

  1. 如果在进程池中有一个准备好被使用的进程,我们只需将其移动到active数组中,然后通过异步的方式调用其回调函数。
  2. 如果池中没有可用的进程,或者已经达到运行进程的最大数量,必须等待。通过把当前回调放入waiting数组。
  3. 如果我们还没有达到运行进程的最大数量,我们将使用child_process.fork()创建一个新的进程,将其添加到active列表中,然后调用其回调。

ProcessPool类的最后一个方法是release(),其目的是将一个进程放回进程池中:

release(worker) {
  if(this.waiting.length > 0) {  // [1]
    const waitingCallback = this.waiting.shift();
    waitingCallback(null, worker);
  }
  this.active = this.active.filter(w => worker !==  w);  // [2]
  this.pool.push(worker);
}

前面的代码也很简单,其解释如下:

  • 如果在waiting任务队列里面有任务需要被执行,我们只需为这个任务分配一个进程worker执行。
  • 否则,如果在waiting任务队列中都没有需要被执行的任务,我们则把active的进程列表中的进程放回进程池中。

正如我们所看到的,进程从来没有中断,只在为其不断地重新分配任务,使我们可以通过在每个请求不重新启动一个进程达到节省时间和空间的目的。然而,重要的是要注意,这可能并不总是最好的选择,这很大程度上取决于我们的应用程序的要求。为减少进程池长期占用内存,可能的调整如下:

  • 在一个进程空闲一段时间后,终止进程,释放内存空间。
  • 添加一个机制来终止或重启没有响应的或者崩溃了的进程。
"use strict";

const fork = require('child_process').fork;

class ProcessPool {
  constructor(file, poolMax) {
    this.file = file;
    this.poolMax = poolMax; // 进程池最大进程数量
    this.pool = []; // 准备运行的进程
    this.active = []; // 正在运行的进程列表,比poolMax数量少
    this.waiting = []; // 任务队列
  }

  /**
   * 取出一个进程,然后调用一个任务
   * @param  {Function} callback 
   */
  acquire(callback) {
    let worker;
    if(this.pool.length > 0) {  // 如果进程池中有准备好使用的进程可以取出,那就取出这个进程
      worker = this.pool.pop(); // 取出这个进程
      this.active.push(worker); // 把这个进程放入正在运行的进程列表
      return process.nextTick(callback.bind(null, null, worker)); // 在下一次Tick之前调用它的回调
    }

    if(this.active.length >= this.poolMax) {  // 如果进程池中没有准备好使用的进程,或者已经达到了最大进程运行数量,把这个任务放入等待队列
      return this.waiting.push(callback); // 放入等待队列
    }

    worker = fork(this.file);  // 否则,既不是进程池有进程可以取出,也不是达到最大进程数量,就创建一个进程
    this.active.push(worker); // 把当前进程放入正在运行的进程列表
    process.nextTick(callback.bind(null, null, worker)); // 在下一次Tick之前调用它的回调
  }

  /**
   * 释放正在运行的进程资源,把这个进程放回进程池
   * @param  {Object} worker 表示一个进程
   */
  release(worker) {
    if(this.waiting.length > 0) {  // 如果还有任务等待被执行,先把任务执行完毕
      const waitingCallback = this.waiting.shift(); // 取出等待队列的一个任务
      waitingCallback(null, worker); // 执行这个任务
    }
    this.active = this.active.filter(w => worker !==  w);  // 如果没有任务等待被执行了,则在正在运行的进程列表找到这个进程,并在这个列表删掉它
    this.pool.push(worker); // 并把这个进程放到进程池里
  }
}

module.exports = ProcessPool;
"use strict";

const fork = require('child_process').fork;

class ProcessPool {
  constructor(file, poolMax) {
    this.file = file;
    this.poolMax = poolMax; // 进程池最大进程数量
    this.pool = []; // 准备运行的进程
    this.active = []; // 正在运行的进程列表,比poolMax数量少
    this.waiting = []; // 任务队列
  }

  acquire(callback) {
    let worker;
    if(this.pool.length > 0) {  // 如果进程池中有准备好使用的进程可以取出,那就取出这个进程
      worker = this.pool.pop(); // 取出这个进程
      this.active.push(worker); // 把这个进程放入正在运行的进程列表
      return process.nextTick(callback.bind(null, null, worker)); // 在下一次Tick之前调用它的回调
    }

    if(this.active.length >= this.poolMax) {  // 如果进程池中没有准备好使用的进程,或者已经达到了最大进程运行数量,把这个任务放入等待队列
      return this.waiting.push(callback); // 放入等待队列
    }

    worker = fork(this.file);  // 否则,既不是进程池有进程可以取出,也不是达到最大进程数量,就创建一个进程
    this.active.push(worker); // 把当前进程放入正在运行的进程列表
    process.nextTick(callback.bind(null, null, worker)); // 在下一次Tick之前调用它的回调
  }

  release(worker) {
    if(this.waiting.length > 0) {  // 如果还有任务等待被执行,先把任务执行完毕
      const waitingCallback = this.waiting.shift(); // 取出等待队列的一个任务
      waitingCallback(null, worker); // 执行这个任务
    }
    this.active = this.active.filter(w => worker !==  w);  // 如果没有任务等待被执行了,则在正在运行的进程列表找到这个进程,并在这个列表删掉它
    this.pool.push(worker); // 并把这个进程放到进程池里
  }
}

module.exports = ProcessPool;

父子进程通信

现在我们的ProcessPool类已经准备就绪,我们可以使用它来实现SubsetSumFork模块,SubsetSumFork的作用是与子进程进行通信得到子集求和的结果。前面曾说到,用child_process.fork()启动一个进程也给了我们创建了一个简单的基于消息的管道,通过实现subsetSumFork.js模块来看看它是如何工作的:

"use strict";

const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
  }

  /**
   * 开始执行SubsetSum计算任务
   */
  start() {
    workers.acquire((err, worker) => { // 尝试从进程池获得一个新进程
      worker.send({sum: this.sum, set: this.set}); // 创建成功后,给子进程发送一条消息,包含当前需要计算的列表和总和

      const onMessage = msg => {
        if (msg.event === 'end') {  // 如果子进程发出的是end事件,则首先去除worker的监听器,再把当前进程放回进程池
          worker.removeListener('message', onMessage); // 移除事件监听器,节省内存
          workers.release(worker); // 放回内存
        }

        this.emit(msg.event, msg.data); // 其它事件则放出给外部监听
      };
      worker.on('message', onMessage); // 监听子进程的消息
    });
  }
}

module.exports = SubsetSumFork;

首先注意,我们在subsetSumWorker.js调用ProcessPool的构造函数创建ProcessPool实例。 我们还将进程池的最大容量设置为2

另外,我们试图维持原来的SubsetSum类相同的公共API。实际上,SubsetSumForkEventEmitter的子类,它的构造函数接受sumset,而start()方法则触发算法的执行,而这个SubsetSumFork实例运行在一个单独的进程上。调用start()方法时会发生的情况:

  1. 我们试图从进程池中获得一个新的子进程。在创建进程成功之后,我们尝试向子进程发送一条消息,包含sumsetsend()方法是Node.js自动提供给child_process.fork()创建的所有进程,这实际上与父子进程之间的通信管道有关。
  2. 然后我们开始监听子进程返回的任何消息,我们使用on()方法附加一个新的事件监听器(这也是所有以child_process.fork()创建的进程提供的通信通道的一部分)。
  3. 在事件监听器中,我们首先检查是否收到一个end事件,这意味着SubsetSum所有任务已经完成,在这种情况下,我们删除onMessage监听器并释放worker,并将其放回进程池中,不再让其占用内存资源和CPU资源。
  4. worker{event,data}格式生成消息,使得任何时候一旦子进程处理完毕任务,我们在外部都能接收到这一消息。

这就是SubsetSumFork模块现在我们来实现这个worker应用程序。

与父进程进行通信

现在我们来创建subsetSumWorker.js模块,我们的应用程序,这个模块的全部内容将在一个单独的进程中运行:

"use strict";

const SubsetSum = require('./subsetSum');

process.on('message', msg => {  // 子进程在收到父进程消息时执行回调,其实是对subsetSum的事件做转发
  const subsetSum = new SubsetSum(msg.sum, msg.set);
  
  subsetSum.on('match', data => {  // 如果收到来自subsetSum的match事件,则向父进程发送收到的数据
    process.send({event: 'match', data: data});
  });
  
  subsetSum.on('end', data => { // 如果收到来自subsetSum的end事件,则向父进程发送收到的数据
    process.send({event: 'end', data: data});
  });
  
  subsetSum.start();
});

由于我们的handler处于一个单独的进程中,我们不必担心这类CPU-bound任务阻塞事件循环,所有的HTTP请求将继续由主应用程序的事件循环处理,而不会中断。

当子进程开始启动时,父进程:

  1. 子进程立即开始监听来自父进程的消息。这可以通过process.on()函数轻松实现。我们期望从父进程中唯一的消息是为新的SubsetSum任务提供输入的消息。只要收到这样的消息,我们创建一个SubsetSum类的新实例,并注册matchend事件监听器。最后,我们用subsetSum.start()开始计算。
  2. 每次子集求和算法收到事件时,把结果它封装在格式为{event,data}的对象中,并将其发送给父进程。这些消息然后在subsetSumFork.js模块中处理,就像我们在前面的章节中看到的那样。

注意:当子进程不是Node.js进程时,则上述的通信管道就不可用了。在这种情况下,我们仍然可以通过在暴露于父进程的标准输入流和标准输出流之上实现我们自己的协议来建立父子进程通信的接口。