node-异步编程

Node 异步编程

有异步I/O,必有异步编程。在开始异步编程之前,先了解下函数式编程,因为他是JavaScript异步编程的基础。

函数式编程

  • 高阶函数
    在通常的语言中,函数的参数只接受基本的数据类型或者是对象引用,返回值也只是基本数据类型和对象引用。下面的代码为常规的参数传递和返回:

    1
    2
    3
    function test(x) {
    return x;
    }

    高阶函数则是可以把函数作为参数,或是将函数作为返回值的函数。

    1
    2
    3
    4
    5
    function test(x) {
    retrun function() {
    retrun x;
    };
    }

    高阶函数可以将函数作为输入或返回值的变化看起来虽细小,但是对于C/C++语言而言,通过指针也可以达到相同的效果。但对于程序编写,高阶函数比普通的函数要灵活许多。

  • 偏函数用法
    所谓偏函数,就是固定一个函数的一个或多个参数,并返回一个新函数,新函数用于接收剩余的参数。下面我们以实例加以说明:

    1
    2
    3
    4
    5
    6
    7
    function isString(obj) {
    return Object.prototype.toString.call(obj) == '[object String]';
    }

    function isFunction(obj) {
    return Object.prototype.toString.call(obj) == '[object Function]';
    }

    在JavaScript中进行类型判断时,我们通常会进行类似上述代码的方法定义。这段代码虽然不复杂,但是当我们需要重复定义一些相似的函数,如果以上述方式定义方法,将会有更多的isXXX(),这样代码就会显得冗余。

    为了解决这个问题,我们可以引入一个新函数,这个新函数可以像工厂一样批量创建一些类似的函数。在下面的代码中,我们将通过isType()函数预先指定type的值,然后返回一个新的函数:

    1
    2
    3
    4
    5
    6
    7
    function isType(type) {
    return function (obj) {
    return Object.prototype.toString.call(obj) == '[object ' + type + ']';
    };
    }
    let isString = isType('String');
    let isFunction = isType('Function');

    可以看出,引入isType()函数后,创建isString()、isFunction()函数就方便多了。这种通过指定部分参数来产生一个新的定制函数的形式就是偏函数

异步编程的优势与难点

  • 优势
    Node是基于事件驱动的非阻塞I/O模型,非阻塞I/O可以使CPU和I/O并不相互依赖等待,让资源得到更好的利用。

  • 难点
    异步编程跟传统同步编程是有很大差异的,同步可以很好解决的问题,但在同步中却变成了难点。

    难点 描述 解决
    异常处理 无法利用try/catch/final捕获异常,也就是说对于回调抛出的异常,使用传统的同步抓取方法是捕获不到的 将异常作为回调函数的第一个实参传回,如果为空值,则表明异步调用没有异常抛出。这也要求我们在编程时去遵循一些原则:必须执行调用者传入的回调函数;正确传递回异常供调用者判断
    函数嵌套过深 callback hell promise async/await
    阻塞代码 Node没有sleep()来阻塞程序 使用setTimeout()阻塞程序,但是这个方案未必好。尽量使用异步编程完成业务需求
    多线程编程 Node的js是单线程上执行的 child_process、cluster,通过多进程的方式调用操作系统的多线程
    异步转同步 还是回调问题 async/await来将异步变同步

异步编程解决方案

目前,异步编程的主要解决方案有如下3种:

  • 事件发布/订阅模式

    事件监听器模式是一种广泛用于异步编程的模式,是回调函数的事件化,又称发布/订阅模式。

    Node自身提供的events模块是发布/订阅模式的一个简单实现,Node中大部分模块都继承于它。这个模块具有addListener/on()、once()、removeListener/off()、removeAllListeners()和emit()等基本的事件监听模式的方法实现。实例代码如下:

    1
    2
    3
    4
    5
    6
    7
    let EventEmitter = require('events');

    const emitter = new EventEmitter();
    emitter.on('event', function (msg) {
    console.log('emitter event ->', msg)
    });
    emitter.emit('event', 'Hello World!');

    可以看出订阅事件就是一个高阶函数的应用。事件发布/订阅模式可以实现一个事件与多个回调函数的关联,这些函数又称为事件侦听器。 通过emit()发布事件后,消息会立即传递给当前事件的所有侦听器执行。侦听器可以很灵活的添加和删除,使得事件和具体处理逻辑之间可以很轻松的关联和解耦。

    事件发布/订阅模式自身并无同步和异步调用的问题,但在Node中,emit()调用多半是伴随事件循环而异步触发的,所以我们说事件发布/订阅模式广泛应用于异步编程。

    1. 继承events模块
      Node在util模块中封装了继承的方法,从而使实现一个继承EventEmiiter的类变得十分简单,示例代码如下:

      1
      2
      3
      4
      5
      6
      7
      let events = require('events');
      let util = require('util');

      function CustomEmitter() {
      events.EventEmitter.call(this);
      }
      util.inherits(CustomEmitter, events.EventEmitter);
    2. 利用事件队列解决雪崩问题

      所谓雪崩问题,就是再高访问量、大并发量的情况下缓存失效的情景,此时大量的请求涌入数据库中,数据库无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度。

      这时我们可以利用事件队列来解决雪崩问题。我们可以once()方法,将所有请求的回调都压入事件队列中,利用其执行一次就会将监视器移出的特点,保证每一个回调只会被执行一次。这个特性可以帮我们过滤一些重复的事件响应。

      例如如下代码:

      1
      2
      3
      4
      5
      function select(callback) {
      db.select('SQL', function (results) {
      callback(results);
      });
      }

      如果站点刚启动,这时缓存中是不存在数据的,而如果访问量巨大,同一句SQL会被发送到数据库中反复查询,进而影响服务性能。我们可以增加一个状态锁去改进下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      let status = 'ready';
      function select(callback) {
      if ('ready' == status) {
      status = 'pending';
      db.select('SQL', function (results) {
      status = 'ready';
      callback(results);
      });
      }
      }

      但是在种情景下,连续多次调用select()时,只有第一次调用是生效的,后续的select()是没有数据服务的,这个时候可以引入事件队列,代码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      let proxy = new events.EventEmitter();
      let status = 'ready';
      function select(callback) {
      proxy.once('selected', callback);
      if ('ready' == status) {
      status = 'pending';
      db.select('SQL', function (results) {
      proxy.emit('selected', results);
      status = 'ready';
      });
      }
      }

      这样我们利用once(),使得对于相同的SQL语句,保证在同一个查询开始到结束的过程中永远只有一次。SQL在进行查询时,新到来的相同调用只需在队列中等待数据即可,一旦查询结束,得到的结果可以被这些调用共同使用。这种方式能节省重复的数据库调用产生的开销。由于Node单线程执行的原因,此处无需担心状态同步问题。这种方式其实也可以应用到其他远程调用的场景中,即使外部没有缓存策略,也能有效节省重复开销。

      此处可能存在侦听器过多引发的警告,需要调用setMaxListenners(0)移除警告,或者设更大的阈值。

    3. 多异步之间的协作方案
      一般而言,事件与侦听器的关系是一对多,但在异步编程中,也会出现事件与侦听器的关系是多对一的情况,也就是说一个业务逻辑可能依赖两个通过回调或事件传递的结果。前面提到的嵌套过深的原因即是如此。

      这里作者想通过原生代码解决“嵌套过深”的问题,这里以渲染页面所需要的模板读取、数据存取和本地化资源读取为例简单介绍下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      let count = 0;
      let results = {};

      function done(key, value) {
      results[key] = value;
      count++;
      if (3 === count) {
      // 渲染页面
      render(results);
      }
      }

      fs.readFile(template_path, 'utf-8', function (err, template) {
      done('template', template);
      });

      db.query(sql, function (err, data) {
      done('data', data);
      });

      l10n.get(function (err, resources) {
      done('resources', resources);
      });

      由于多个异步场景中回调函数的执行并不能保证顺序,且回调函数之间互相没有任何交集。所以需要借助一个第三方函数和第三方变量来处理异步协作的结果。通常,我们把这个用于检测次数的变量叫做哨兵变量。此处需要利用偏函数来处理哨兵变量和第三方函数的关系,我们把done函数改写下,代码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      function after(times, callback) {
      let count = 0;
      let results = {};
      return function (key, value) {
      results[key] = value;
      count++;
      if (times == count)
      callback(results);
      };
      }

      let done = after(3, render);

      上述方案实现了多对一的目的,如果业务继续增长,我们依然可以继续利用发布/订阅方式来完成多对多的方案,相关代码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      let emitter = new events.EventEmitter();
      let done = after(times, render);

      events.on('done', done);
      events.on('done', other);

      fs.readFile(template_path, 'utf-8', function (err, template) {
      emitter.emit('done','template', template);
      });

      db.query(sql, function (err, data) {
      emitter.emit('done',('data', data);
      });

      l10n.get(function (err, resources) {
      emitter.emit('done',('resources', resources);
      });

      这种方案结合了前者用简单的偏函数完成多对一的收敛和事件订阅/发布模式中一对多的发散。

      另外一种方案EventProxy, 这里不多说,自行参阅。

  • Promise/Deferred模式
    典型的异步Promise/Deferred模式有Promises/A、Promises/B、Promises/D等,这里着重介绍下Promises/A。

    Promises/A提议对单个异步做出了这样的抽象定义,具体如下:
    * Promise操作只会处在3种状态的一种:未完成、完成、失败
    * Promise的状态只会出现从未完成态向完成态或失败态转化,不能逆反。完成态和失败态不能互相转化。
    * Promise的状态一旦转化,将不能更改。
    Promises/A的实现非常简单,一个Promise对象只要具备then()方法即可,但是对于then()方法,有以下简单的要求:
    * 接受完成态、错误态的回调方法。在操作完成或出现错误时,将会调用对应方法
    * 可选地支持progress事件回调作为第三个方法
    * then()方法只接受function对象,其余对象将被忽略。
    * then()方法继续返回Promises对象,以实现链式调用。
    then()的方法定义:

    1
    then(fulfilledHandler, errorHandler, progressHandler)

    我们通过events模块来实现then(),如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    let events = require('events');
    let util = require('util');
    let Promise = function () {
    events.EventEmitter.call(this)
    };

    util.inherits(Promise, events.EventEmitter);

    Promise.prototype.then = function (fulfilledHandler, errHandler, progressHandler) {
    if ('function' == typeof fulfilledHandler) {
    this.once('success', fulfilledHandler);
    }
    if ('function' == typeof errHandler) {
    this.once('error', errHandler);
    }
    if ('function' == typeof progressHandler) {
    this.once('progress', progressHandler);
    }
    return this;
    };

    这里看到then()方法所做饿事情就是将回调函数存放起来,为了完成整个流程,还需要触发执行这些回调函数的地方,实现这些功能的对象通常被称为Deferred,即延迟对象,实例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    let Deferred = function () {
    this.state = 'unfulfilled';
    this.promise = new Promise();
    };

    Deferred.prototype.resolve = function (obj) {
    this.state = 'fulfilled';
    this.promise.emit('success', obj);
    };

    Deferred.prototype.reject = function (err) {
    this.state = 'failed';
    this.promise.emit('error', err);
    };

    Deferred.prototype.progress = function (data) {
    this.promise.emit('progress', data);
    };

    利用Promises/A模式,我们可以对一个典型的对象进行封装,相关代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Http
    res.setEncoding('utf-8');
    res.on('data', function (d) {
    console.log('data', d) ;
    });
    res.on('end', function () {
    console.log('end') ;
    });
    res.on('error', function (err) {
    console.log('error', err) ;
    });

    上述代码可以转换为如下的简略格式:

    1
    2
    3
    4
    5
    6
    7
    res.then(function() {
    console.log('end') ;
    }, function(err) {
    console.log('error', err) ;
    }, function(data) {
    console.log('data', d) ;
    });

    要实现上述格式,我们只需简单改造下,相关代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    let promiseify = function (res) {
    let deferred = new Deferred();
    let result = '';
    res.on('data', function (data) {
    result += data;
    deferred.progress(data);
    });
    res.on('end', function () {
    deferred.resolve(result);
    });
    res.on('error', function (err) {
    deferred.resolve(err);
    });
    // 这里返回deferred.promise的目的是为了不让外部程序调用reslove()和reject()方法,更改内部状态的行为交由定义者处理
    return deferred.promise;
    };

    promiseify(res).then(function () {
    console.log('end');
    }, function (err) {
    console.log('error', err);
    }, function (data) {
    console.log('data', data);
    });

    从上面的代码可以看出:

    Deferred主要是用于内部,用于维护异步模型的状态;Promise则作用于外部,通过then()方法暴露给外部以添加自定义逻辑

    与事件发布/订阅模式相比,Promise/Deferred模式的API接口和抽象模型都十分简洁。它将业务中不可以变的部分封装到Deferred中,将可变的部分交给了Promise。此时问题就来了,对于不同的场景,都需要封装和改造起Deferred部分,然后才能得到简洁的接口。如果场景不常用,封装花费的时间与带来的简洁并不一定划算。

  • 流程控制库

    1. 尾触发与next
      在用express中间件时,有一个next对象,如下:
    1
    2
    3
    4
    5
    var app = express()
    app.use(function (req, res, next) {
    console.log('Time:', Date.now())
    next();
    })

    每个中间件传递请求对象、响应对象和尾触发函数,通过队列形成一个处理流。

    1. async
    2. setp
    3. wind

异步并发控制

  • bagpipe
  • async