0%

EJ11

异步编程

计算机的中心,也就是执行组成我们程序独立的步的部分叫做处理器。我们到目前为止所见的程序将会使处理器一直运转知道它们完成工作。修改数字的循环以多快的速度执行极度依赖处理器的速度。

但是许多程序与处理器外的东西交互。例如,它们可能通过计算机网络通讯或者从硬盘上请求数据,这些操作远比从内存中获取慢得多。

当这样的事情发生时,让处理器停转是一件遗憾的事情,存在可以同时做的一些其他的工作。在某种程度上,这是通过操作系统处理的,操作系统将会在多个运行程序间切换处理器。但是当我们想要让单个程序在等待网络请求的同时能够继续运行操作系统就帮不上忙了。

异步性

在同步编程模型中,一次只能发生一件事。当你调用一个长时运行的函数时,只有当行为结束并且返回结果时他才会返回。当行为在进行时这阻塞了你的程序。

一个异步模型允许多个事情同时发生。当你开始一个行为的时候,你的程序继续运行。当行为结束时,程序被通知然后获取结果(例如,从硬盘读取数据)。

我们可以用一个小例子比较同步编程和异步编程:一个从网络获取两个资源的程序,随后将结果进行整合。

在同步环境下,请求函数只在它完成工作后才可返回,最简单的做这个人物的方式是一个挨着另一个的发起请求。缺点在于第二个请求只有在第一个结束之后才可以发起。总时间至少是两次响应时间的和。

这个问题的解,在一个同步系统中,是开一个额外的控制线程。一个线程(thread)是另一个运行的程序,通过操作系统,它的运行可能与其他程序交错进行,因为大多数现代计算机包含多个处理器,多个线程就可以同时在不同处理器运行。第二个线程可以发起第二个请求,然后两个线程等待响应,随后它们重新同步来组合结果。

在下面的图中,粗线代表程序正常运行的时间,细线代表等待网络响应的时间。在同步模型中,花费在网络上的时间是给定控制线程的时间轴的一部分。在异步模型中,开始一个网络行为概念上导致时间轴上一个分叉。开启行为的程序继续运行,并且行为和它一同进行,当行为完成时通知程序。

另一种描述区别的方式是,完成行为的等待在同步模型中是隐式的,而在异步模式中是显式的在我们控制下的。

异步使得表达不适合直线模型的控制更简单,但是也能让表达直线程序更加尴尬。我们随后将会看一些解决这种尴尬性的方式。

两个重要的JS编程平台,浏览器和Node.js,对于可能花费一些时间的操作采取异步策略,而不是依赖于线程。因为线程编程是出了名的困难(当一次做许多事情时理解一个程序做什么很困难),所以这通常被认为是个好事情。

乌鸦科技

大多数人知道乌鸦是非常小的鸟。它们可以使用工具,提前计划,记住事情,甚至在他们之间交流这些事情。

大多数人不知道的是它们能够做许多我们不知道的事情。我曾经被一个鸦科专家告诉乌鸦科技离人类科技不远,它们正在努力追赶。

例如,许多乌鸦文化有能力构建计算设备。这些不是像人类的计算设备的电子器件,而是通过小昆虫的行动来操作,一个和白蚁紧密相关的物种,已经与乌鸦建立了一种共生关系。鸟给它们提供食物,作为回报,昆虫构建并且操作它们的复杂的种群,在他们体内的生物的帮助下,进行计算。

这些种群通常坐落于大的可长期居住的巢穴。鸟和昆虫一起工作,在鸟巢的树枝之间去建立一个球状粘土结构的网络,昆虫在里面生活和工作。

为了和其他设备通信,这些机器使用光信号。乌鸦将一些反光材料嵌入到特殊的通信杆中,昆虫将这些材料用于反射另一个巢穴的光线,将数据编码为一个快速闪光序列。这意味着只有拥有完整视觉连接的巢穴才可以通信。

我们的鸦科专家绘制了罗纳河岸上的amby村庄的乌鸦巢穴网络图,这张图展示了巢穴和他们之间的连接。

在这个例子中,乌鸦计算机可以运行JS。在这一章中,我们将为他们写一些基本的网络功能。

回调

一个异步编程的方法是让执行缓慢操作的函数接受一个额外的参数,一个回调函数。该操作被启动,当完成时,回调函数被用这个结果进行调用。

作为一个例子,setTimeout函数等待给定的毫秒数(1秒是1000毫秒)然后调用一个函数。

1
setTimeout(() => console.log("Tick"), 500);

等待通常不是一种非常重要的工作类型,但是当更新动画时或者检查是否有某件事情比预期时间花费了更长时间是是很有用的。

在一行使用回调执行多个异步行为意味着你必须持续传递新的函数,在行为结束后处理计算过程的延续。

大多数的乌鸦巢穴计算机有一个持久存储数据的电灯泡,在这个灯泡里,信息被蚀刻到细枝上,以便之后可以检索到,蚀刻或者查找数据花费一些时间,所以持久存储的接口是异步的并且使用回调函数。

存储灯泡以JSON编码数据存储。一个乌鸦可能根据名字”food caches”来存储关于它隐藏食物的信息,可能是一个名字数组指向其他位置的数据,描述实际的贮藏地。为了在Big Oak的存储灯泡中查找一个食物贮藏地,一个乌鸦会运行这样的代码:

1
2
3
4
5
6
7
8
import {bigOak} from "./crow-tech";
// bigOak由附录可知为一个node实例,包含storage对象信息,从中读取的food caches为一个数组
bigOak.readStorage("food caches", caches => {
let firstCache = caches[0];
bigOak.readStorage(firstCache, info => {
console.log(info);
});
});

这种风格的编程是可行的,但是随着每一次异步行为缩进都会增加。如果做更复杂的如同时运行多个行为,可能有点不雅。

乌鸦巢穴计算机用请求-响应对进行通讯。那意味着一个巢穴向另一个巢穴发送消息,另一个巢穴再回复消息,确认收到并且可能包含对所问问题的回复。

每一个消息用一个类型来打标签,决定了它如何被处理。我们的代码可以为特定的请求类型定义处理程序,并且当这样的请求进来的时候,处理程序被调用产生一个回应。

被”./crow-tech”模块输出的接口提供基于回调的通信函数。巢穴有一个send方法来发送一个请求。它接受目标巢穴的名字,请求的类型以及请求的内容作为前面三个参数,还接受最后一个参数,它是一个函数并在响应回来的时候调用。

1
2
// 作为node实例的bigOak有send方法,接收目标节点,消息类型,消息内容和一个回调函数用于接收方处理完毕并发回响应时调用
bigOak.send("Cow Pasture", "note", "Let's caw loudly at 7PM", () => console.log("Note delivered."));

但是为了使得巢穴可以接受这种请求,我们首先必须定义名为note的请求类型。处理这个请求的代码要在所有可以接受这种类型的消息的巢穴上都能运行。我们将假设乌鸦到处飞然后在所有巢穴安装我们的处理程序代码。

1
2
3
4
5
6
7
// 网络实例上的defineRequestType方法,在network实例对象的types增加一个键值对,值为handler
import {defineRequestType} from "./crow-tech"
// 定义"note"类型,并表明接收方收到该类型的处理函数,与上面的send一脉相承,在附录的send实现中,我们要在一定概率下,在接收方调用handler函数,在这里就是首先打印xxx收到了xxx内容,然后调用done函数,而done函数就是在10ms后调用send提供的callback函数(如果有error作为第一个参数传给callback),有点点绕。。。需要结合附录源码理解
defineRequestType("note", (nest, content, source, done) => {
console.log(`${nest.name} received note: ${content}`);
done();
});

defineRequestType函数定义一种新类型的请求。例子添加了对”note”请求的支持,仅仅是为给定巢穴发送一个音符。我们的实现调用console.log如此我们可以确认请求到达了。巢穴有一个name属性保存着他们的名字。

在某种程度上,异步是可以传染的。任何调用异步函数的函数本身必须是异步的,使用一个回调或者一个相似的机制来传递结果。与简单地返回值相比,调用回调更加复杂,所以需要以这种方式大规模构造程序并不好。

Promises

和抽象的概念相处要比可以被值表示的概念更容易。就异步行为而言,代替为将来某一刻要调用的函数做安排,你可以返回一个代表这个将来事件的对象。

这就是标准中Promise类的用途。一个promise就是一个异步操作,可能在某个时刻完成并产生一个值。当这个值可以获取时能够通知任何对此感兴趣的人。

最简单的创建promise的方式就是调用Promise.resolve。这个函数确保你给的值被包装在一个promise中。如果它已经是一个promise了,就简单的返回,否则你得到一个新的promise,并立刻以你的值返回。

1
2
3
let fifteen = Promise.resolve(15);
fifteen.then(value => console.log(`Got ${value}`));
// → Got 15

为了得到promise的结果,你可以使用它的then方法。这注册了一个回调函数当promise解析成功并且产生一个值时调用。你可以为单个promise添加多个回调函数,即使你在promise已经被解决(resolved, finished)后添加他们,它们也会被调用。

但是那并不是then方法的全部功能。他返回另一个promise,这个promise解析为处理程序返回的值,或者如果返回一个promise,则等待这个promise,然后解析为其结果。

将promise想象成一个将值移动到一个异步现实的设备是很有用的。一个正常值简单地在那。一个promise值就是可能已经在那或者可能在将来某个时间才会出现的值。根据promise定义的计算作用在这些包装值上,并且在值可用时异步执行。

为了创建一个promise,你可以用Promise构造器。它有一个奇怪的接口,构造器接受一个函数作为参数,然后该函数被调用,传递给它一个函数去解析promise。它以这种方式工作,而不是用resolve方法,如此一来只有创建promise的代码才能解析它。

这就是为readStorage函数创建的基于promise的接口。

1
2
3
4
5
6
7
8
9
// 将读取storage的异步方法封装(实现通过直接读取节点的storage对象的name属性获得,并通过setTimeout伪装成异步)。
function storage(nest, name) {
return new Promise(resolve => {
nest.readStorage(name, result => resolve(result));
})
}

storage(bigOak, "enemies");
.then(value => console.log("Got", value));

这个异步函数返回一个有意义的值。这是promise主要的优点,它们简化了异步函数的使用。代替不得不传进去回调,基于promise的函数看上去像普通的函数:它们接收输入作为参数并且返回输出。唯一的区别就是输出还不可用。

失败

常规的JS计算可以因抛出一个异常而失败。异步计算经常需要这样的东西。网络请求可能失败,或者异步计算的某部分代码会抛出异常。

对于回调风格的异步编程最紧迫的问题就是确保失败正确报告给回调是非常困难的。

一个广泛使用的规范就是传递给回调的第一个参数被用于表明行为失败,第二个包含当行为成功时产生的值。这样的回调函数必须检查是否产生了异常并确保它们调用函数导致的抛出的异常,被捕获并传递给正确的函数。

promise使得这个很容易。它们要么可以被resolved(动作成功结束)或者rejecteds(失败了)。resolve处理程序(用then注册)只在动作成功才调用,并且rejections自动传播到由then返回的新的promise。当一个处理程序抛出异常时,这自动导致由then调用产生的promise被rejected。所以如果任何在异步动作链中的元素失败,整个链的输出都被标记为rejected,并且在失败的点的后面没有成功处理程序被调用。

就像resolve一个promise提供一个值一样,reject也会提供一个值,通常叫做rejection的reason。当一个处理函数的异常导致了rejection,异常值被用作reason。相似地,当一个处理程序返回一个rejected的promise,这个rejection流动到下一个promise。有一个Promise.reject函数立刻创建一个全新的rejected的promise。

为了明确处理rejections,promise有一个catch方法注册一个处理程序用于在promise被reject时调用,与then处理程序如何处理正常的resolve类似。在返回新的promise方面也非常类似then方法。如果catch处理程序抛出错误,那么新的promise也是rejected的。

作为一种简略的表达形式,then也接受一个rejection处理程序作为它的第二个参数,所以可以在单个方法调用中安装两种类型的处理程序。

传递给Promise构造器的函数接收第二个参数,挨着resolve函数,可被用于reject新的promise。

由then和catch调用产生的promise值链可以被看作是一个流水线,通过这个流水线,异步值或者失败移动。因为这样的链通过注册处理程序创建,每一个链接都关联一个成功处理程序或者一个rejection处理程序或者二者皆有,不匹配输出(成功或者失败)类型的处理程序被忽略。但是那些类型匹配的被调用,并且他们的输出决定了即将到来的值的类型,如果返回一个非promise值就成功,抛出异常则reject,如果返回promise则取决于promise返回的结果。

1
2
3
4
5
6
7
8
9
new Promise((_, reject) => reject(new Error("Fail")))
.then(value => console.log("Handler 1"))
.catch(reason => {
console.log("Caught failure " + reason);
return "nothing";
})
.then(value => console.log("Handler 2", value));
// → Caught failure Error: Fail
// → Handler 2 nothing

就像未捕获的异常由环境处理一样,当一个promise rejection没有被处理时JS环境可以检测到并且将其作为错误报告。

网络很难

偶尔,对于乌鸦的镜像系统没有足够的光去传播一个信号或者有东西阻塞了信号传播。对于一个发出的信号但是永不会接收到是有可能的。

实际上,这将导致send的回调永远不会调用,这可能导致程序终止,而且我们甚至没有意识到存在问题。如果在给定时间过后还没有收到回应,请求超时并报告失败是不错的。

通常,传输失败是随机的意外,比如汽车的车头灯干扰了光信号,简单地重新尝试请求可能会成功。所以在此过程中,让我们的请求函数在它放弃之前重新发送请求多次。

既然我们已经认定promise是个好东西,我们也要让我们的请求函数返回一个promise。就它们可以表达的而言,回调和primise是等同的。基于回调的函数可以被包装来暴露一个基于promise的接口,反之亦然。

即使当一个请求和响应成功送达,响应也可能会失败,例如,如果请求还没有定义的类型或者处理程序抛出一个错误。为了支持这个,send和defineRequestType遵循之前提及的规范,传递给回调的第一个参数是失败的原因,第二个是实际的结果。

通过我们的包装器可以解释为promise的resolution和rejection。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 使超时容易被区分
class Timeout extends Error {}

function request(nest, target, type, content) {
return new Promise((resolve, reject) => {
let done = false;
function attempt(n) {
// send的回调如果在发送或者处理过程过程中产生error,那么通过failed可以检测到并将错误reject到链式调用的下一步
nest.send(target, type, content, (failed, value) => {
done = true;
if (failed) reject(failed);
else resolve(value);
});
setTimeout(() => {
if (done) return;
else if (n < 3) attempt(n + 1);
else reject(new Timeout("Timed out"));
}, 250);
}
attempt(1);
});
}

非原文翻译:这里额外做一下解释,以免将来忘记。我们这里的意图是想把request函数包装为promise形式。回调版本是基于对象调用,我们这里封装成一个函数,需要把源对象带上,也就是nest,这里作为新函数的第一个参数,而因为不需要callback,所以参数列表还是四个参数。然后我们调用request函数的时候,立刻执行promise构造器中的函数,首先初始化done为false来表示我们还未收到响应,随后调用attempt函数也就是开始尝试发送请求,这个通过nest对象的send方法实现,也是”crow-tech”包中提供的方法。这个方法的第四个参数是响应收到时的回调函数。我们调用send方法之后,由于它是异步方法,所以无需等待立刻执行setTimeout这也是一个异步函数,那么等待250毫秒之后,若我们在这个等待过程中已经收到响应了,那么就会先调用上面send方法的回调函数,就会将done设置为false。若这个等待时间过了,还没有执行刚刚的回调函数,说明响应因为某种原因没有收到,我们调用attempt并把参数值加1,使得三次attempt不成功之后就放弃并报出超时。这里还有一个地方没有说明,就是即便我们收到了响应,由于消息类型未知或者对于该消息的处理程序出现问题,我们也会失败,所以done设置为true之后,还要判断回调函数的第一个参数是不是被设置了,因为上面提到回调函数遵循的规范是第一个参数表明是否失败,而第二个参数才是回调返回的值。如果第一个参数被设置,说明在刚刚说的某个环节出现了问题,那么我们依然要reject,否则才是彻底的没问题,我们resolve结果即可。并一旦在attempt三次之内将done设置为true,我们下一次的执行的setTimeout就会return了。至此函数执行完毕。

因为promise只可以被resolve或者reject一次,这会工作。第一次resolve或者reject调用决定了promise的输出,任何下面的调用,比如在请求完成之后超时到达或者请求在另一次请求完成之后回来,都被忽略了。

为了构建一个异步的循环,对于重新尝试,我们需要一个递归函数,一个普通的循环不允许我们等待一个异步行为。attempt函数做单个attempt去发送一个请求。它也设置了一个超时,也就是说如果250毫秒过后没有响应,要么开始下一次尝试要么如果这是第四次尝试,就用Timeout的一个实例作为reason来reject promise。

每0.25秒重新请求并且1秒后没有响应就直接放弃当然有点武断。如果请求确实传送到了但是对于多次投递的请求处理程序多花费了点时间是可能的。我们将在编写处理程序的时候考虑这个问题——重复的消息应是无害的。

一般来说,我们今天不会构建一个世界级的,健壮的网络。但是那是ok的——乌鸦并没有太高的期望。

为了完全使我们隔离回调,我们继续,也要为defineRequestType定义一个包装器,使之允许处理函数返回一个promise或者普通的值,并将其和回调连接起来。

1
2
3
4
5
6
7
8
9
10
// 将定义消息类型包装到一个基于promise的函数中
function requestType(name, handler) {
defineRequestType(name, (nest, content, source, callback) => {
try {
Promise.resolve(handler(nest, content, source)).then(response => callback(null, response), failure => callback(failure));
} catch (exception) {
callback(exception);
}
});
}

非原文翻译:首先我们这个函数只是调用了defineRequestType这个函数,而在源代码实现中,这个函数只是将network实例的types属性上增加一个名为name的属性,其值为这里传入的handler。而它真正要被用到的是在后面的send函数中,我们在接收节点对数据的处理。而我们在这里传入的handler(接收三参数)并没有直接应用到对应的消息类型,而是将新函数内部的一个四参数的handler应用到该消息类型(每种消息类型都需要一个四参数的handler,最后一个参数包含回调),因为我们最终源代码里应用的是一个四参数的handler,那么我们调用者传入的handler要做什么呢?这个handler只接受三个参数,用于单纯的处理工作,而异步部分由promise完成。我们这里的Promise.resolve接收handler处理的结果,返回的值可能是JS原始值或者是promise值,而如果是原始值,那么就执行then的第一个参数,如果是promise值,就要根据resolve还是reject执行then的对应参数了。并且因为handler函数也可能抛出错误,这时then方法就帮不上什么忙了,需要catch其中的错误,并传递给回调函数。

Promise.resolve被用于将一个handler返回的值转换成promise(如果不是promise)。

注意对handler的调用被包装在try块中来保证任何抛出的异常直接给到回调函数。这很好地阐述了使用原始回调正确处理错误的困难——很容易忘记将异常像这样正确地路由,并且如果你不做这个,失败将不会报告到正确的回调。promise使得这个大多数都是自动的并且更不易出错。

promise集合

每一个巢穴计算机在他的neighbors属性中保存了传输距离内的其他巢穴的数组。为了检查哪些是当前可达的,你可以写一个函数尝试去发送”ping”请求到所有neighbor并观察哪些会有所响应。

当处理运行在同一时间的promise集合时,Promise.all函数是很有用的。它返回一个promise,这个promise等待数组中所有的promise去resolve,然后resolve到一个这些promise产生的值的数组(和原数组一样的顺序)。如果任何promise被reject了,那么Promise.all的结果本身也是reject。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 定义一种消息类型并指定其handler
// 接收到ping消息类型时,handler返回pong
requestType("ping", () => "pong");
// 这里我们要找到从某一巢穴可达的邻居集合,以数组形式体现
function availableNeighbor(nest) {
let requests = nest.neighbors.map(neighbor => {
return request(nest, neighbor, "ping")
.then(() => true, () => false);
});
// requests是一个promise数组,并且promise状态都为resolved,只是值有的为true,有的为false
return Promise.all(requests).then(result => {
return nest.neighbors.filter((_, i) => result[i])
});
}

非原文翻译:对上述代码进一步解释一下,首先nest.neighbors是响应巢穴的邻居数组,数组的map方法用于在数组上每个元素执行一个方法,并把得到的值替换到数组相应位置并返回这个新构造的数组。那么我们通过map方法构建了一个promise的数组,数组首先调用request函数,如果promise被resolve,那么我们调用then方法的第一个函数参数,否则调用then方法的第二个函数参数。结果就是产生了相应promise值为true或者false的promise对象,(这里我查看了浏览器端promise对象包含的内容,发现promise对象由PromiseStatus和PromiseValue两部分组成,如图示)这样我们获得了一个promise数组,并且对于不可达的也就是reject的位置我们将其promiseValue设置为false。这样是为了后面我们Promise.all方法不会fail,从而能成功调用then方法的第一个参数,所以我们利用filter的第二个参数是当前数组索引的事实,来成功筛选出那些promise含有值true的巢穴。

当一个neighbor不可得时,我们不想要整个组合的promise fail因为那样我们仍然不知道任何东西。所以映射到邻居集合的函数将他们转换为请求promise关联的处理程序成功的返回true,失败的返回false。

在组合的promise的处理函数中,filter被用于从邻居数组中移除那些对应值是false的元素。这利用了filter将当前元素的数组索引传递为第二个参数的事实(map,some和相似的高阶数组方法都是一样的)。

网络洪泛

巢穴只可以和他们邻居聊天的方式极大的抑制了网络的用途。

为了广播信息到整个网络,一个解决方案就是建立一种类型的请求来自动向邻居转发。这些邻居反过来在向它们的邻居转发,直到整个网络收到消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// everywhere是一个绑定到network实例执行的函数,对network的nodes对象的各个属性执行传入的函数
import {everywhere} from "./crow-tech";

// 为network中的nodes对象中的各节点对象调用该函数
// 每个节点对象含有一个state属性对象,我们为了避免重复发送gossip,需要记录一个接受过的gossip数组。
everywhere(nest => {
nest.state.gossip = [];
});

// 从某一节点向除了exceptFor之外的其他邻居节点发送gossip
function sendGossip(nest, message, exceptFor = null) {
nest.state.gossip.push(message);
for(let neighbor of nest.neighbors) {
if(neighbor == exceptFor) continue;
request(nest, neighbor, "gossip", message)
}
}
// 定义接收gossip时应该如何处理,注意这里传入的handler就是三参数的handler,不包含回调,因为我们在上面封装的函数中已经做处理了
requestType("gossip", (nest, message, source) => {
// 如果已经见过了该消息,那么就不再继续处理
if(nest.state.gossip.includes(message)) return;
// 打印一段提示性信息
console.log(`${nest.name} received gossip '${message}' from ${source}`);
// 向除了源节点外的其他邻居节点发送gossip
sendGossip(nest, message, source);
});

为了避免一直在网络中发送重复的消息,每一个巢穴保持它已经看见的gossip的字符串数组。为了定义这个数组,我们使用了everywhere函数——在每个巢穴上运行代码——对巢穴的state对象添加一个属性,在这个属性上我们保存巢穴局部的状态。

当一个巢穴收到一个重复的gossip消息,这个消息很有可能是别人盲目的重新发送过来的,它就会忽略他。但是当收到一个新消息的时候,它会将消息发送给除了发给它的那个源的其他所有邻居。

这将会使得一段gossip在网络中的传播犹如在水中墨水的扩散过程一样。即使有一些连接当前无效,如果有一个去给定巢穴的替代路由,这个gossip也会经由替代路由到达那里。

这种类型的网络通信叫做洪泛(flooding)——用一段信息洪范到整个网络直到所有节点接收到。

我们可以调用sendGossip来在整个村庄发送消息。

1
sendGossip(bigOak, "Kids with airgun in the park");

消息路由

如果一个给定节点想要和其他单个节点聊天,洪泛不是一个非常高效的方法。尤其当网络很大的时候,那将会导致大量无用信息的传输。

一个替代方案是开发一种新的方式让消息去从一个节点跳到另一个节点直到它们到达终点。这种方案的困难之处在于需要知道整个网络的结构。为了向遥远的巢穴发送一个请求,需要知道哪一个邻居巢穴可以转发这个消息到终点。发送到错误的方向不是太好。

因为每个巢穴都只知道直接相连的邻居,没有需要的信息去计算一个路由。我们必须以某种方式传播这些连接信息到所有的巢穴,最好是以一种可随时间动态变化的方式,比如当巢穴被遗弃或者新的巢穴被建筑。

我们可以再一次使用洪泛,但是不在检查消息是否已经收到过,而是检查给定巢穴的新的邻居集合是否匹配我们当前它的邻居集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 定义消息类型以及三参数的handler,注意这里的消息内容是一个含有两个属性的对象
requestType("connections", (nest, {name, neighbors}, source) {
// 和上面ping pong的函数类似,首先要获取到我们定义在state对象上的connections,为一个map,包含一个键值对,键为自己的name,值为neighbors数组
let connections = nest.state.connections;
// 接收到的neighbors若和之前name下记录的neighbors相同,则不做任何改动
// 这里还有一个关于数组比较的小hack
if(JSON.stringify(connections.get(name)) == JSON.stringify(neighbors)) return;
// 说明有变动或者初次接收这个连接信息
// 在自己的连接状态map中为这个name增加一个键值对
connections.set(name, neighbors);
// 广播连接信息
broadcastConnections(nest, name, source);
});

//
function broadcastConnections(nest, name, exceptFor = null) {
// 除了exceptFor指明的邻居,向其他邻居发送connection类型的消息,内容为一个包含该name以及对应name的neighbors数组的对象。
for(let neighbor of nest.neighbors) {
if(neighbor == exceptFor) continue;
request(nest, neighbor, "connections", {
name,
neighbors: nest.state.connections.get(name)
});
}
}

// 对网络中的所有节点进行的操作
everywhere(nest => {
// 在每个节点的state对象上新增一个connections属性,并将其初始化为一个map
nest.state.connections = new Map;
// 在节点的connections map上新增本身的name和neighbors键值对
nest.state.connections.set(nest.name, nest.neighbors);
// 从自己开始广播connections,由于每个节点从自身开始,这一步不包含exceptFor
broadcastConnections(nest, nest.name);
});

比较使用了JSON.stringify由于==,在对象或者数组只在两个是一样的引用的时候才返回true,并不是我们在这里想要的结果。而比较JSON字符串是一种简单高效的比较内容的方法。

节点立刻开始广播它们的连接,除非有些巢穴完全不可达,否则很快就使得每个巢穴获得当前网络图的结构。

如同我们在第七章看到的一样,你可以用图来做的一件事就是在图中寻找路由。如果我们有一个朝着消息终点的路由,我们知道要把它发送到哪个方向。

这个findRoute函数,非常类似第七章的findRoute函数,寻找到达网络中一个给定节点的路径。但是代替返回整个路由,它仅仅返回下一跳。下一跳是用自己当前关于整个网络的信息得到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// BFS找路径
function findRoute(from, to, connections) {
let work = [{at: from, via: null}];
for(let i = 0; i < work.length; i++) {
let {at, via} = work[i];
// 如果不存在at的connections信息,用[],不加这个或会导致这种情况报错
for(let next of connections.get(at) || []) {
// 如果已经找到一个路径
if(next == to) return via;
// 如果有一个邻居没有访问过,就加到向外生长的队列中
if(!work.some(w => w.at == next)) {
// 初始情况via为null,所以这种情况下的via为next
work.push({at: next, via: via || next});
}
}
}
return null;
}

现在我们构建了一个可以发送长距离消息的函数。如果这个消息的终点是直接邻居,那么就正常投递。否则,被包装到一个对象中,并且发送到一个靠近终点的邻居,使用”route”请求类型,这个行为会导致邻居重复相同的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 在看过前面几个代码例子后,我们发现就是我们每次增加需求的时候,都需要定义两个函数
// 一个就是定义消息类型的函数
// 另一个就是发送这种消息请求的函数

// 发送一个路由请求
function routeRequest(nest, target, type, content) {
// 当要发送的终点节点是直接邻居时,正常发送request即可
if(nest.neighbors.includes(target)) {
return request(nest, target, type, content);
} else {
// 否则我们需要利用findRoute寻找下一跳
let via = findRoute(nest.name, target, nest.state.connections);
// 有可能找不到这样的路由,需要抛出错误
if(!via) throw new Error(`No such route to ${target}`);
// 否则向下一跳发送包装过后的content。消息类型为route
return request(nest, via, "route", {target, type, content});
}
}

// 定义route消息类型,直接调用routeRequest
requestType("route", (nest, {target, type, content}) => {
return routeRequest(nest, target, type, content);
});

我们现在可以向church tower巢穴发送一个消息了,有四跳那么远。

1
routeRequest(bigOak, "Church Tower", "note", "Incoming jackdaws!");

我们已经在原始通讯系统上构建了几层的功能来使他更容易使用。这是一个很好(虽然简化了)的真实计算机网络工作模型。

计算机网络的不同的地方在于它们是不可依赖的——构建在它们上边的抽象并不能帮到什么忙,但是你不能将网络失败抽象走。所以网络编程通常是关于预测和处理失败。

async函数

为了存储重要的信息,乌鸦们可以在巢穴间复制信息。以这种方式,就不怕老鹰过来破坏他们的巢穴而致使信息丢失了。

为了检索自己巢穴没有储存的信息,一个巢穴计算机不得不随机向网络中的其他巢穴寻求帮助直到它们找到一个拥有这个信息的巢穴。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 我们定义一个storage的消息类型,handler只需要name参数即可,第一个nest为接收方节点对象,然后返回调用storage函数的结果。
requestType("storage", (nest, name) => storage(nest, name));

// 前面将storage封装为了promise版本,接收到返回结果后,执行then中的函数,如果当前节点找得到,那么直接返回,否则调用findRemoteStorage
function findInStorage(nest, name) {
return storage(nest, name).then(found => {
if (found != null) return found;
else return findInRemoteStorage(nest, name);
});
}

// connections是一个map对象,keys方法返回一个iterator,用Array.from可以将其包装为一个数组。
function network(nest) {
return Array.from(nest.state.connections.keys());
}

function findInRemoteStorage(nest, name) {
// 将connections对象中等于当前nest name的筛选掉,只剩下非当前nest的其他nest的connections信息
let sources = network(nest).filter(n => n != nest.name);
function next() {
// 如果全被筛选掉了,直接reject到findInStorage
if (sources.length == 0) {
return Promise.reject(new Error("Not found"));
} else {
// 随机选中一个source,将其筛选出去,留下其余
let source = sources[Math.floor(Math.random() * sources.length)];
sources = sources.filter(n => n != source);
// 从当前节点向随机出去的节点发送storage类型的消息,如果返回值为null,则继续调用next直到reject或者找到一个非nulll的值
return routeRequest(nest, source, "storage", name).then(value => value != null ? value : next(), next);
}
}
return next();
}

因为connections是一个map。Object.keys在map不能用。map有个keys方法,但是这返回一个itertor而不是一个数组。一个iterator(或者iterable)可以通过Array.from转换成数组。

即便是用promise这段代码也比较笨拙的。多个异步动作以一种不明显的方式链到一起。我们再一次需要一个递归函数(next)来为在这些巢穴间的循环建模。

代码实际上做的事完全线性的,他总是等待前面的行动完成才开始下一次行动。用一种同步的编程模型,这可能更容易去表达。

好消息是JS允许你书写伪同步代码去描述异步计算。async函数是隐式返回promise的函数,并且它可以在函数体内以一种看上去像同步的方式await其他promises。

我们可以像这样重写findInStorage函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async function findInStorage(nest, name) {
let local = await storage(nest, name);
if (local != null) return local;

let sources = network(nest).filter(n => n != nest.name);
while (sources.length > 0) {
let source = sources[Math.floor(Math.random() * sources.length)];
sources = sources.filter(n => n != source;
try {
let found = await routeRequest(nest, source, "storage", name);
if (found != null) return found;
} catch (_) {}
}
throw new Error("Not found");
}

async函数通过在function关键字之前加一个async单词标记。方法也可以成为async的通过在它们的名字之前写一个async。当这样的函数或者方法调用时,它返回一个promise。函数体一返回什么的时候,这个promise就被resolve了。如果抛出异常,则promise被reject。

1
findInStorage(bigOak, "events on 2017-12-21").then(console.log);

在一个async函数中,await单词可以放在一个表达式之前用来等待一个promise去resolve并且只有那样才继续执行函数。

这样的函数不再像一个普通的JS函数,从头运行到尾。法案二,他可以在任何有await的地方被冻结,并且可在之后继续执行。

对于重要的异步代码,这种改变通常比直接使用promise更方便。即使你需要做一些不遵循同步模型的事情,例如在同一时刻采取多个行动,组合await和promise的直接使用是很容易的。

Generators

这种可以暂停并再次继续函数的能力不是async专有的。JS还有一个叫做generator函数的特性。它们是相似的,但是generator没有promise。

当你用function*定义函数的时候(在关键字function后添加一个*),它就变成了一个generator。当你调用一个generator的时候,它返回一个iterator,我们在第六章已经说明过。

1
2
3
4
5
6
7
8
9
10
11
12
13
function* powers(n) {
for(let current = n;; current *= n) {
yield current;
}
}

for(let power of powers(3)) {
if(power > 50) break;
console.log(power);
}
// -> 3
// -> 9
// -> 27

初始情况下,当你调用powers的时候,这个函数开始被冻结。每一次在iterator上调用next的时候,函数运行直到遇见一个yield表达式,暂停并导致yield的值成为iterator下一个产生的值。当函数返回的时候(例子中并没有返回),iterator结束。

当使用generator函数时更容易书写iterator。Group类的iterator(第六章练习)可以用这个generator来写。

1
2
3
4
5
Group.prototype[Symbol.iterator] = function*() {
for(let i = 0; i < this.members.length; i++) {
yield this.members[i];
}
}

不再需要创建一个对象来保存迭代状态了,generators每一次yield的时候自动保存他们的局部状态。

这样的yield表达式只会直接出现在generator函数本身中,而不会在你在其内部定义的函数。当yield的时候,generator保存的状态只是它的局部环境和yield的位置。

一个async函数是一种特殊类型的generator。当调用的时候产出一个promise, 当返回(完成)的时候被resolve,当抛出异常的时候reject。无论什么时候它yield(await)一个promise,那个promise(值或者抛出的异常)的结果就是await表达式的结果。

事件循环

异步程序是一块接一块执行的。当行动结束或者失败时每一块都可能开始执行一些行为和schedule代码。在这些块之间,程序处于空闲状态,等待下一次行动。

所以回调并没有被schedule他们的代码直接调用。如果我在一个函数内部调用setTimeout,直到回调函数调用时,那个函数可能已经返回了。当回调返回的时候,控制并没有回到调用它的那个函数。

异步行为发生在它自己的空的函数调用栈。这是没有promise时,在异步代码间管理异常困难的原因之一。因为每一个回调开始于一个空的栈,你的catch处理程序将不会在它们抛出异常的那个栈。

1
2
3
4
5
6
7
8
try {
setTimeout(() => {
throw new Error("Woosh");
}, 20);
} catch (_) {
// This will not run
console.log("Caught");
}

不管事件有多密集地发生,如超时或者到来的请求事件,一个JS环境一次只能运行一个程序。你可以将这个想象为在整个程序运行一个大的循环,叫做事件循环。当没有事情要做的时候,那个循环就停止了。但是随着事件不断来临,它们被添加到队列中,并且他们的代码一个挨一个的执行。因为不可能有两件事情同时运行,执行慢的代码可能延迟其他事件的处理。

这个例子设置了一个超时,但是一直拖到超时的预定时间点之后,导致超时延迟。

1
2
3
4
5
6
7
8
let start = Date.now();
setTimeout(() => {
console.log("Timeout ran at", Date.now() - start);
}, 20);
while (Date.now() < start + 50) {}
console.log("Wasted time until", Date.now() - start);
// → Wasted time until 50
// → Timeout ran at 55

promise总是作为一个新事件resolve或者reject。即便一个promise已经resolve了,等待它将会导致你的回调在当前脚本完成之后运行,而不是立刻运行。

1
2
3
4
Promise.resolve("Done").then(console.log);
console.log("Me first");
// -> Me first
// -> Done

在之后的章节里,我们将会看到运行在事件循环上的各种各样类型的其他事件。

异步bug

当你的程序同步运行的时候,再一次运行中,除了那些程序所做的状态改变没有其他的状态改变。对于异步程序,这是不同的,它们可能在执行过程中有其他代码可以运行的间隙。

让我们看个例子。我们的乌鸦的兴趣之一就是查每年在村子里孵化小鸡的数量。巢穴在让门的存储灯泡中储存这个数量。下面的代码尝试从给定年份所有的巢穴中枚举这个数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function anyStorage(nest, source, name) {
if (source == nest.name) return storage(nest, name);
else return routeRequest(nest, source, "storage", name);
}

async function chicks(nest, year) {
let list = "";
await Promise.all(network(nest).map(async name => {
list += `${name}: ${
await anyStorage(nest, name, `chicks in ${year}`)
}\n`;
}));
return list;
}

async name =>部分说明可以在箭头函数前面加async来使得其成为async的函数。

这段代码乍一看没有可疑之处…它在巢穴集合上映射async函数,创造了一个promise数组,并且在返回构建的list之前用Promise.all等待他们中的所有resolve或是reject。

但是确实有严重的错误。他将总是返回一个单行的输出,列出最慢响应的巢穴。

1
chicks(bigOak, 2017).then(console.log);

你能发现原因吗?

问题就是+=操作符,接收语句开始执行时候的list的当前值,然后再await完成的时候,将list绑定设置为加上added字符串的值。

但是在语句开始的时间和完成的时间有一个异步的空子。这个map表达式在任何东西被添加到列表之前运行,所以每一个+=操作符开始于一个空字符串并且当存储解锁完成终止,将list设置为一个单行的list——添加这一行到空字符串的结果。

这通过从映射的promise返回一行和在Promise.all的结果上调用join可以很容易地避免,而不是通过改变绑定来构建列表。像往常一样,计算新值比起改变存在的值更不宜产生错误。

1
2
3
4
5
6
7
async function chicks(nest, year) {
let lines = network(nest).map(async name => {
return name + ": " +
await anyStorage(nest, name, `chicks in ${year}`);
});
return (await Promise.all(lines)).join("\n");
}

类似这样的错误容易制造,尤其是使用await的时候,你应该意识到你的代码中哪里有gap存在。一个JS明确异步性(不管是通过回调,promise还是await)的好处就是识别这些gap相对容易。

附录:crow-tech代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
(function() {
// 仍然以from-to的形式指明图结构
const connections = [
"Church Tower-Sportsgrounds", "Church Tower-Big Maple", "Big Maple-Sportsgrounds",
"Big Maple-Woods", "Big Maple-Fabienne's Garden", "Fabienne's Garden-Woods",
"Fabienne's Garden-Cow Pasture", "Cow Pasture-Big Oak", "Big Oak-Butcher Shop",
"Butcher Shop-Tall Poplar", "Tall Poplar-Sportsgrounds", "Tall Poplar-Chateau",
"Chateau-Great Pine", "Great Pine-Jacques' Farm", "Jacques' Farm-Hawthorn",
"Great Pine-Hawthorn", "Hawthorn-Gilles' Garden", "Great Pine-Gilles' Garden",
"Gilles' Garden-Big Oak", "Gilles' Garden-Butcher Shop", "Chateau-Butcher Shop"
]

//storageFor返回一个和给定name对象存储相关的对象。键为字符串,值为json字符串(对于值为数组或者对象方便我们处理)
function storageFor(name) {
let storage = Object.create(null)
storage["food caches"] = ["cache in the oak", "cache in the meadow", "cache under the hedge"]
storage["cache in the oak"] = "A hollow above the third big branch from the bottom. Several pieces of bread and a pile of acorns."
storage["cache in the meadow"] = "Buried below the patch of nettles (south side). A dead snake."
storage["cache under the hedge"] = "Middle of the hedge at Gilles' garden. Marked with a forked twig. Two bottles of beer."
storage["enemies"] = ["Farmer Jacques' dog", "The butcher", "That one-legged jackdaw", "The boy with the airgun"]
if (name == "Church Tower" || name == "Hawthorn" || name == "Chateau")
storage["events on 2017-12-21"] = "Deep snow. Butcher's garbage can fell over. We chased off the ravens from Saint-Vulbas."
let hash = 0
for (let i = 0; i < name.length; i++) hash += name.charCodeAt(i)
for (let y = 1985; y <= 2018; y++) {
storage[`chicks in ${y}`] = hash % 6
hash = Math.abs((hash << 2) ^ (hash + y))
}
if (name == "Big Oak") storage.scalpel = "Gilles' Garden"
else if (name == "Gilles' Garden") storage.scalpel = "Woods"
else if (name == "Woods") storage.scalpel = "Chateau"
else if (name == "Chateau" || name == "Butcher Shop") storage.scalpel = "Butcher Shop"
else storage.scalpel = "Big Oak"
for (let prop of Object.keys(storage)) storage[prop] = JSON.stringify(storage[prop])
return storage
}

class Network {
constructor(connections, storageFor) {
// reachable包含图拓扑,为一个对象,键为巢穴名字,值为邻居巢穴字符串数组
let reachable = Object.create(null)
for (let [from, to] of connections.map(conn => conn.split("-"))) {
;(reachable[from] || (reachable[from] = [])).push(to)
;(reachable[to] || (reachable[to] = [])).push(from)
}
// 创建的network实例有一个nodes对象,键为网络节点名字,值为node实例对象
this.nodes = Object.create(null)
// Object.keys只返回对象本身的键
for (let name of Object.keys(reachable))
// 传入当前处理节点名字,邻居巢穴字符串数组, 网络实例和对应name的存储相关对象
this.nodes[name] = new Node(name, reachable[name], this, storageFor(name))
this.types = Object.create(null)
}

// 定义请求类型,接收节点名字,和一个处理函数
defineRequestType(name, handler) {
this.types[name] = handler
}

// 对于网络实例中的nodes对象,在其上调用同一个函数
everywhere(f) {
for (let node of Object.values(this.nodes)) f(node)
}
}

// 后面用symbol作键
const $storage = Symbol("storage"), $network = Symbol("network")

// 如是null或者undefined,则返回null,否则将值变为JSON字符串再进行解析,原因暂时还不清楚(19.1.13)?
function ser(value) {
return value == null ? null : JSON.parse(JSON.stringify(value))
}


class Node {
// 每个节点包含名字,邻居,网络,状态和存储相关属性
constructor(name, neighbors, network, storage) {
this.name = name
this.neighbors = neighbors
this[$network] = network
this.state = Object.create(null)
this[$storage] = storage
}

// 向其他节点发送消息,参数包括终点节点,消息类型,消息内容以及回调函数
send(to, type, message, callback) {
// 获取终点节点实例
let toNode = this[$network].nodes[to]
// 不存在这个节点或者不是直接邻居节点,那么结束函数,并传递error到回调函数,参数为一个说明性的字符串
if (!toNode || !this.neighbors.includes(to))
return callback(new Error(`${to} is not reachable from ${this.name}`))
// 是邻居节点我们获取到这种类型消息的处理函数(handler)
let handler = this[$network].types[type]
// 如果没有注册这种消息类型,我们结束函数,传递未知请求类型错误到回调
if (!handler)
return callback(new Error("Unknown request type " + type))
// 有这种消息类型,有3%概率发送失败
if (Math.random() > 0.03) setTimeout(() => {
try {
handler(toNode, ser(message), this.name, (error, response) => {
setTimeout(() => callback(error, ser(response)), 10)
})
} catch(e) {
callback(e)
}
}, 10 * Math.floor(Math.random() * 10))
}

// 读取该节点中storage的属性,并在获取JSON字符串值后将其解析为JS常规值传入回调函数,当要读取的属性不存在时,将其传入回调,若存在(不为undefined,则因其存储为JSON字符串,需要解析后传入回调函数)。
readStorage(name, callback) {
let value = this[$storage][name]
setTimeout(() => callback(value && JSON.parse(value)), 20)
}

// 直接将键值对写入$storage属性对象,注意值要存入JSON字符串形式,然后空参调用回调
writeStorage(name, value, callback) {
setTimeout(() => {
this[$storage][name] = JSON.stringify(value)
callback()
}, 20)
}
}

// 构建一个网络对象,传入connections数组以及storageFor函数
let network = new Network(connections, storageFor)
// 对外暴露三个接口,bigOak表示bigOak节点实例
exports.bigOak = network.nodes["Big Oak"]
// 将network绑定到everywhere函数中的this
exports.everywhere = network.everywhere.bind(network)
// 将defineRequestType中的this绑定到该network实例
exports.defineRequestType = network.defineRequestType.bind(network)

if (typeof __sandbox != "undefined") {
__sandbox.handleDeps = false
__sandbox.notify.onLoad = () => {
// Kludge to make sure some functions are delayed until the
// nodes have been running for 500ms, to give them a chance to
// propagate network information.
let waitFor = Date.now() + 500
function wrapWaiting(f) {
return function(...args) {
let wait = waitFor - Date.now()
if (wait <= 0) return f(...args)
return new Promise(ok => setTimeout(ok, wait)).then(() => f(...args))
}
}
for (let n of ["routeRequest", "findInStorage", "chicks"])
window[n] = wrapWaiting(window[n])
}
}

if (typeof window != "undefined") {
window.require = name => {
if (name != "./crow-tech") throw new Error("Crow nests can only require \"./crow-tech\"")
return exports
}
} else if (typeof module != "undefined" && module.exports) {
module.exports = exports
}
})()