node-异步编程
Node 异步编程
有异步I/O,必有异步编程。在开始异步编程之前,先了解下函数式编程,因为他是JavaScript异步编程的基础。
函数式编程
高阶函数
在通常的语言中,函数的参数只接受基本的数据类型或者是对象引用,返回值也只是基本数据类型和对象引用。下面的代码为常规的参数传递和返回:1
2
3function test(x) {
return x;
}高阶函数则是可以把函数作为参数,或是将函数作为返回值的函数。
1
2
3
4
5function test(x) {
retrun function() {
retrun x;
};
}高阶函数可以将函数作为输入或返回值的变化看起来虽细小,但是对于C/C++语言而言,通过指针也可以达到相同的效果。但对于程序编写,高阶函数比普通的函数要灵活许多。
偏函数用法
所谓偏函数,就是固定一个函数的一个或多个参数,并返回一个新函数,新函数用于接收剩余的参数。下面我们以实例加以说明:1
2
3
4
5
6
7function isString(obj) {
return Object.prototype.toString.call(obj) == '[object String]';
}
function isFunction(obj) {
return Object.prototype.toString.call(obj) == '[object Function]';
}在JavaScript中进行类型判断时,我们通常会进行类似上述代码的方法定义。这段代码虽然不复杂,但是当我们需要重复定义一些相似的函数,如果以上述方式定义方法,将会有更多的isXXX(),这样代码就会显得冗余。
为了解决这个问题,我们可以引入一个新函数,这个新函数可以像工厂一样批量创建一些类似的函数。在下面的代码中,我们将通过isType()函数预先指定type的值,然后返回一个新的函数:
1
2
3
4
5
6
7function isType(type) {
return function (obj) {
return Object.prototype.toString.call(obj) == '[object ' + type + ']';
};
}
let isString = isType('String');
let isFunction = isType('Function');可以看出,引入isType()函数后,创建isString()、isFunction()函数就方便多了。这种通过指定部分参数来产生一个新的定制函数的形式就是偏函数。
异步编程的优势与难点
优势
Node是基于事件驱动的非阻塞I/O模型,非阻塞I/O可以使CPU和I/O并不相互依赖等待,让资源得到更好的利用。难点
异步编程跟传统同步编程是有很大差异的,同步可以很好解决的问题,但在同步中却变成了难点。难点 描述 解决 异常处理 无法利用try/catch/final捕获异常,也就是说对于回调抛出的异常,使用传统的同步抓取方法是捕获不到的 将异常作为回调函数的第一个实参传回,如果为空值,则表明异步调用没有异常抛出。这也要求我们在编程时去遵循一些原则:必须执行调用者传入的回调函数;正确传递回异常供调用者判断 函数嵌套过深 callback hell promise async/await 阻塞代码 Node没有sleep()来阻塞程序 使用setTimeout()阻塞程序,但是这个方案未必好。尽量使用异步编程完成业务需求 多线程编程 Node的js是单线程上执行的 child_process、cluster,通过多进程的方式调用操作系统的多线程 异步转同步 还是回调问题 async/await来将异步变同步
异步编程解决方案
目前,异步编程的主要解决方案有如下3种:
事件发布/订阅模式
事件监听器模式是一种广泛用于异步编程的模式,是回调函数的事件化,又称发布/订阅模式。
Node自身提供的events模块是发布/订阅模式的一个简单实现,Node中大部分模块都继承于它。这个模块具有addListener/on()、once()、removeListener/off()、removeAllListeners()和emit()等基本的事件监听模式的方法实现。实例代码如下:
1
2
3
4
5
6
7let EventEmitter = require('events');
const emitter = new EventEmitter();
emitter.on('event', function (msg) {
console.log('emitter event ->', msg)
});
emitter.emit('event', 'Hello World!');可以看出订阅事件就是一个高阶函数的应用。事件发布/订阅模式可以实现一个事件与多个回调函数的关联,这些函数又称为事件侦听器。 通过emit()发布事件后,消息会立即传递给当前事件的所有侦听器执行。侦听器可以很灵活的添加和删除,使得事件和具体处理逻辑之间可以很轻松的关联和解耦。
事件发布/订阅模式自身并无同步和异步调用的问题,但在Node中,emit()调用多半是伴随事件循环而异步触发的,所以我们说事件发布/订阅模式广泛应用于异步编程。
继承events模块
Node在util模块中封装了继承的方法,从而使实现一个继承EventEmiiter的类变得十分简单,示例代码如下:1
2
3
4
5
6
7let events = require('events');
let util = require('util');
function CustomEmitter() {
events.EventEmitter.call(this);
}
util.inherits(CustomEmitter, events.EventEmitter);利用事件队列解决雪崩问题
所谓雪崩问题,就是再高访问量、大并发量的情况下缓存失效的情景,此时大量的请求涌入数据库中,数据库无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度。
这时我们可以利用事件队列来解决雪崩问题。我们可以once()方法,将所有请求的回调都压入事件队列中,利用其执行一次就会将监视器移出的特点,保证每一个回调只会被执行一次。这个特性可以帮我们过滤一些重复的事件响应。
例如如下代码:
1
2
3
4
5function select(callback) {
db.select('SQL', function (results) {
callback(results);
});
}如果站点刚启动,这时缓存中是不存在数据的,而如果访问量巨大,同一句SQL会被发送到数据库中反复查询,进而影响服务性能。我们可以增加一个状态锁去改进下:
1
2
3
4
5
6
7
8
9
10let status = 'ready';
function select(callback) {
if ('ready' == status) {
status = 'pending';
db.select('SQL', function (results) {
status = 'ready';
callback(results);
});
}
}但是在种情景下,连续多次调用select()时,只有第一次调用是生效的,后续的select()是没有数据服务的,这个时候可以引入事件队列,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12let proxy = new events.EventEmitter();
let status = 'ready';
function select(callback) {
proxy.once('selected', callback);
if ('ready' == status) {
status = 'pending';
db.select('SQL', function (results) {
proxy.emit('selected', results);
status = 'ready';
});
}
}这样我们利用once(),使得对于相同的SQL语句,保证在同一个查询开始到结束的过程中永远只有一次。SQL在进行查询时,新到来的相同调用只需在队列中等待数据即可,一旦查询结束,得到的结果可以被这些调用共同使用。这种方式能节省重复的数据库调用产生的开销。由于Node单线程执行的原因,此处无需担心状态同步问题。这种方式其实也可以应用到其他远程调用的场景中,即使外部没有缓存策略,也能有效节省重复开销。
此处可能存在侦听器过多引发的警告,需要调用setMaxListenners(0)移除警告,或者设更大的阈值。
多异步之间的协作方案
一般而言,事件与侦听器的关系是一对多,但在异步编程中,也会出现事件与侦听器的关系是多对一的情况,也就是说一个业务逻辑可能依赖两个通过回调或事件传递的结果。前面提到的嵌套过深的原因即是如此。这里作者想通过原生代码解决“嵌套过深”的问题,这里以渲染页面所需要的模板读取、数据存取和本地化资源读取为例简单介绍下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23let count = 0;
let results = {};
function done(key, value) {
results[key] = value;
count++;
if (3 === count) {
// 渲染页面
render(results);
}
}
fs.readFile(template_path, 'utf-8', function (err, template) {
done('template', template);
});
db.query(sql, function (err, data) {
done('data', data);
});
l10n.get(function (err, resources) {
done('resources', resources);
});由于多个异步场景中回调函数的执行并不能保证顺序,且回调函数之间互相没有任何交集。所以需要借助一个第三方函数和第三方变量来处理异步协作的结果。通常,我们把这个用于检测次数的变量叫做哨兵变量。此处需要利用偏函数来处理哨兵变量和第三方函数的关系,我们把done函数改写下,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12function after(times, callback) {
let count = 0;
let results = {};
return function (key, value) {
results[key] = value;
count++;
if (times == count)
callback(results);
};
}
let done = after(3, render);上述方案实现了多对一的目的,如果业务继续增长,我们依然可以继续利用发布/订阅方式来完成多对多的方案,相关代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17let emitter = new events.EventEmitter();
let done = after(times, render);
events.on('done', done);
events.on('done', other);
fs.readFile(template_path, 'utf-8', function (err, template) {
emitter.emit('done','template', template);
});
db.query(sql, function (err, data) {
emitter.emit('done',('data', data);
});
l10n.get(function (err, resources) {
emitter.emit('done',('resources', resources);
});这种方案结合了前者用简单的偏函数完成多对一的收敛和事件订阅/发布模式中一对多的发散。
另外一种方案EventProxy, 这里不多说,自行参阅。
Promise/Deferred模式
典型的异步Promise/Deferred模式有Promises/A、Promises/B、Promises/D等,这里着重介绍下Promises/A。Promises/A提议对单个异步做出了这样的抽象定义,具体如下:
* Promise操作只会处在3种状态的一种:未完成、完成、失败
* Promise的状态只会出现从未完成态向完成态或失败态转化,不能逆反。完成态和失败态不能互相转化。
* Promise的状态一旦转化,将不能更改。
Promises/A的实现非常简单,一个Promise对象只要具备then()方法即可,但是对于then()方法,有以下简单的要求:
* 接受完成态、错误态的回调方法。在操作完成或出现错误时,将会调用对应方法
* 可选地支持progress事件回调作为第三个方法
* then()方法只接受function对象,其余对象将被忽略。
* then()方法继续返回Promises对象,以实现链式调用。
then()的方法定义:1
then(fulfilledHandler, errorHandler, progressHandler)
我们通过events模块来实现then(),如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20let events = require('events');
let util = require('util');
let Promise = function () {
events.EventEmitter.call(this)
};
util.inherits(Promise, events.EventEmitter);
Promise.prototype.then = function (fulfilledHandler, errHandler, progressHandler) {
if ('function' == typeof fulfilledHandler) {
this.once('success', fulfilledHandler);
}
if ('function' == typeof errHandler) {
this.once('error', errHandler);
}
if ('function' == typeof progressHandler) {
this.once('progress', progressHandler);
}
return this;
};这里看到then()方法所做饿事情就是将回调函数存放起来,为了完成整个流程,还需要触发执行这些回调函数的地方,实现这些功能的对象通常被称为Deferred,即延迟对象,实例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18let Deferred = function () {
this.state = 'unfulfilled';
this.promise = new Promise();
};
Deferred.prototype.resolve = function (obj) {
this.state = 'fulfilled';
this.promise.emit('success', obj);
};
Deferred.prototype.reject = function (err) {
this.state = 'failed';
this.promise.emit('error', err);
};
Deferred.prototype.progress = function (data) {
this.promise.emit('progress', data);
};利用Promises/A模式,我们可以对一个典型的对象进行封装,相关代码如下:
1
2
3
4
5
6
7
8
9
10
11// Http
res.setEncoding('utf-8');
res.on('data', function (d) {
console.log('data', d) ;
});
res.on('end', function () {
console.log('end') ;
});
res.on('error', function (err) {
console.log('error', err) ;
});上述代码可以转换为如下的简略格式:
1
2
3
4
5
6
7res.then(function() {
console.log('end') ;
}, function(err) {
console.log('error', err) ;
}, function(data) {
console.log('data', d) ;
});要实现上述格式,我们只需简单改造下,相关代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24let promiseify = function (res) {
let deferred = new Deferred();
let result = '';
res.on('data', function (data) {
result += data;
deferred.progress(data);
});
res.on('end', function () {
deferred.resolve(result);
});
res.on('error', function (err) {
deferred.resolve(err);
});
// 这里返回deferred.promise的目的是为了不让外部程序调用reslove()和reject()方法,更改内部状态的行为交由定义者处理
return deferred.promise;
};
promiseify(res).then(function () {
console.log('end');
}, function (err) {
console.log('error', err);
}, function (data) {
console.log('data', data);
});从上面的代码可以看出:
Deferred主要是用于内部,用于维护异步模型的状态;Promise则作用于外部,通过then()方法暴露给外部以添加自定义逻辑
与事件发布/订阅模式相比,Promise/Deferred模式的API接口和抽象模型都十分简洁。它将业务中不可以变的部分封装到Deferred中,将可变的部分交给了Promise。此时问题就来了,对于不同的场景,都需要封装和改造起Deferred部分,然后才能得到简洁的接口。如果场景不常用,封装花费的时间与带来的简洁并不一定划算。
流程控制库
- 尾触发与next
在用express中间件时,有一个next对象,如下:
1
2
3
4
5var app = express()
app.use(function (req, res, next) {
console.log('Time:', Date.now())
next();
})每个中间件传递请求对象、响应对象和尾触发函数,通过队列形成一个处理流。
- 尾触发与next
异步并发控制
- bagpipe
- async