Promise 回调队列优先处理一些回调
Promise callback queue with priority for some callbacks
我的代码需要同时执行大量异步操作并以特定的顺序方式处理它们的承诺。
我所说的 "specific sequential way" 的意思 ==> 假设您按此顺序开始承诺 promise1
、promise2
、promise3
,并且 promise3 实际上首先解决,然后是 promise2,那么我想要的是按顺序依次处理 promise3
、promise2
和 promise1
让我们考虑一个 timeout
异步函数,它在 X 秒后超时,并且 fetchMyItem1
、fetchMyItem2
、return 承诺在实现时应该执行一个不同的代码取决于 timeout
是否已解决。
对于一个具体的场景,假设有一位顾客在柜台等待送货,要么顾客留下来,我们可以一次拿一件物品直接在柜台为他服务,要么顾客走了(timeout
),我们不得不请服务员过来,以便当物品到达时,he/she 可以将物品带给他。此处请注意,即使一件物品正在交付,其他物品仍应进行交付(承诺待定),甚至可能在为客户提供一件物品或服务器到达时到达(履行)。
这是一些开始的代码
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]
const customerLeavesCounterPromise = timeout()
const waiter = undefined
const allPromisesToBeFulfilled = [...allItemsToBeDeliveredPromises, customerLeavesCounterPromise]
// LOOP
const itemDeliveredOrVisitorLeft = await Promise.all(allPromisesToBeFulfilled)
if hasCustomerLeft(itemDeliveredOrCustomerLeft) {
// hasCustomerLeft allows us to detect if the promise that resolved first is `customerLeavesCounterPromise` or not
waiter = await callWaiter()
} else {
// An item has arrived
if (waiter) {
deliverItemViaWaiter(itemDeliveredOrVisitorLeft)
} else {
deliverItemAtCounter(itemDeliveredOrVisitorLeft)
}
}
// remove itemDeliveredOrCustomerLeft from allPromisesToBeFulfilled
// END loop
我不确定如何为这种情况实现循环。 Promise 必须在 resolve 时累积到队列中,但是队列中的特定 promise 有优先级(超时 promise 到达时应尽快执行,但如果 promise fulfillment 已经完成,则在完成当前 promise 的处理之后正在处理)
one of them must have the effect that, whenever it fulfils, it should alter the processing of other fulfilled promises that have not yet been processed or are not yet being processed.
你运行他们在一起吗?类似于:
promise1 = asyncFunc1()
promise2 = asyncFunc2()
promise3 = asyncFunc3()
Promise.all([promise1, promise2, promise3]).then(//sometthing)
如果是,则无法完成,除非 promise2 函数和 promise3 函数不在等待事件、锁或由 promise1 函数处理的内容。
如果是这种情况,最好组织这样的承诺:
asyncFunc1()
.then(() => asyncFunc2(paramIfOk))
.catch(() => asyncFunc2(paramIfFail))
在你的例子中:
const allItemsToBeDelivered = [myPromise1(), myPromise2(), ...]
myPromise1() 代码应该等待项目交付 和 检查是否有人在等待它。这是一个 model/code 设计问题,而不是承诺问题。
另一种方法是考虑一些事件驱动的逻辑:实体 Customer 有一个已交付事件的侦听器,该侦听器将在解析自身之前由 waitItemDelivered() promise 触发。
编辑:根据此处的要求,对事件驱动的解决方案进行更多阐述。
简短回答:这在很大程度上取决于您的软件设计。
它已经发布并且 运行正在生产中?小心这样的变化。如果这是您正在开发的服务,您仍然及时考虑一些逻辑更改。一个不会从根本上改变工作方式但使用事件的解决方案并不是那么好,混合模式从长远来看永远不会有回报。
示例:
const events = require('events');
class Customer() {
constructor(shop) {
this.emitter = new events.EventEmitter()
this.shopEventListener = shop.emitter
this.setupEventLinstening() // for keeping things clean in the constructor
// other properties here
}
setupEventLinstening() {
this.shopEventListener.on('product-delivered', (eventData) => {
// some logic
})
}
buyProduct() {
// some logic
this.emitter.emit('waiting-delivery', eventData)
}
}
class Shop() {
constructor(shop) {
this.emitter = new events.EventEmitter()
// other properties here
}
addCustomer(customer) {
customer.emitter.on('waiting-delivery', (eventData) => {
// some logic
self.waitDelivery().then(() => self.emitter.emit('product-delivered'))
})
}
waitDelivery() {
return new Promise((resolve, reject) => {
// some logic
resolve()
})
}
}
// setup
const shop = new Shop()
const customer = new Customer(shop)
shop.addCustomer(customer)
这是一种查看逻辑的新方法,但可以在 promise 中使用类似的方法:
const waitDelivery = () => new Promise((resolve, reject) => {
logisticWaitDelivery().then(() => {
someEmitter.emit('item-delivered')
resolve()
})
}
const customerPromise = () => new Promise((resolve, reject) => {
someListener.on('item-delivered', () => {
resolve()
})
}
promiseAll([waitDelivery, customerPromise])
制作 2 个队列,第二个队列在第一个队列结束后开始:
var highPriority=[ph1,ph2,...]//promises that have to be executed first
var lowPriority=[pl1,pl2,...]//promises that have to wait the high priority promises
Promise.all(highPriority).then(()=>{
Promise.all(lowPriority)
})
My code needs to perform a multitude of async actions simultaneously and handle their promises in a specific sequential way.
您可以使用流来使用承诺,因为流本质上是一次处理一条消息的队列。
想法(即未测试):
import { Readable, Writable } from 'stream';
let customerHasLeft = false;
/*const customerLeavesCounterPromise = */timeout() // your code...
.then(() => { customerHasLeft = true; }); // ... made boolean
// let's push all our promises in a readable stream
// (they are supposedly sorted in the array)
const input = new Readable({
objectMode: true,
read: function () { // don't use arrow function: we need `this`
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]; // your code
// put everything, in the same order, in the output queue
allItemsToBeDeliveredPromises.forEach(p => this.push(p));
this.push(null); // terminate the stream after all these
}
});
// let's declare the logic to process each promise
// (depending on `timeout()` being done)
const consumer = new Writable({
write: async function (promise, uselessEncoding, callback) {
try {
const order = await promise; // wait for the current promise to be completed
} catch (e) {
/* delivery error, do something cool like a coupon */
return callback(e); // or return callback() without e if you don't want to crash the pipe
}
if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
else { /* deliver `order` at the counter */ }
callback(); // tell the underlying queue we can process the next promise now
}
});
// launch the whole pipe
input.pipe(consumer);
// you can add listeners on all events you'd like:
// 'error', 'close', 'data', whatever...
编辑:实际上我们希望在承诺解决时按顺序处理承诺(即所有承诺的单个 post 流程)
let customerHasLeft = false;
timeout() // your code...
.then(() => { customerHasLeft = true; }); // ... made boolean
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...];
const postProcessChain = Promise.resolve(); // start with a promise ready to be thened
// add a next step to each promise so that as soon as one resolves, it registers
// as a next step to the post-process chain
allItemsToBeDeliveredPromises.forEach(p => p.then(order => postProcessChain.then(async () => {
// do something potentially async with the resulting order, like this:
if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
else { /* deliver `order` at the counter */ }
})));
我的代码需要同时执行大量异步操作并以特定的顺序方式处理它们的承诺。
我所说的 "specific sequential way" 的意思 ==> 假设您按此顺序开始承诺 promise1
、promise2
、promise3
,并且 promise3 实际上首先解决,然后是 promise2,那么我想要的是按顺序依次处理 promise3
、promise2
和 promise1
让我们考虑一个 timeout
异步函数,它在 X 秒后超时,并且 fetchMyItem1
、fetchMyItem2
、return 承诺在实现时应该执行一个不同的代码取决于 timeout
是否已解决。
对于一个具体的场景,假设有一位顾客在柜台等待送货,要么顾客留下来,我们可以一次拿一件物品直接在柜台为他服务,要么顾客走了(timeout
),我们不得不请服务员过来,以便当物品到达时,he/she 可以将物品带给他。此处请注意,即使一件物品正在交付,其他物品仍应进行交付(承诺待定),甚至可能在为客户提供一件物品或服务器到达时到达(履行)。
这是一些开始的代码
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]
const customerLeavesCounterPromise = timeout()
const waiter = undefined
const allPromisesToBeFulfilled = [...allItemsToBeDeliveredPromises, customerLeavesCounterPromise]
// LOOP
const itemDeliveredOrVisitorLeft = await Promise.all(allPromisesToBeFulfilled)
if hasCustomerLeft(itemDeliveredOrCustomerLeft) {
// hasCustomerLeft allows us to detect if the promise that resolved first is `customerLeavesCounterPromise` or not
waiter = await callWaiter()
} else {
// An item has arrived
if (waiter) {
deliverItemViaWaiter(itemDeliveredOrVisitorLeft)
} else {
deliverItemAtCounter(itemDeliveredOrVisitorLeft)
}
}
// remove itemDeliveredOrCustomerLeft from allPromisesToBeFulfilled
// END loop
我不确定如何为这种情况实现循环。 Promise 必须在 resolve 时累积到队列中,但是队列中的特定 promise 有优先级(超时 promise 到达时应尽快执行,但如果 promise fulfillment 已经完成,则在完成当前 promise 的处理之后正在处理)
one of them must have the effect that, whenever it fulfils, it should alter the processing of other fulfilled promises that have not yet been processed or are not yet being processed.
你运行他们在一起吗?类似于:
promise1 = asyncFunc1()
promise2 = asyncFunc2()
promise3 = asyncFunc3()
Promise.all([promise1, promise2, promise3]).then(//sometthing)
如果是,则无法完成,除非 promise2 函数和 promise3 函数不在等待事件、锁或由 promise1 函数处理的内容。
如果是这种情况,最好组织这样的承诺:
asyncFunc1()
.then(() => asyncFunc2(paramIfOk))
.catch(() => asyncFunc2(paramIfFail))
在你的例子中:
const allItemsToBeDelivered = [myPromise1(), myPromise2(), ...]
myPromise1() 代码应该等待项目交付 和 检查是否有人在等待它。这是一个 model/code 设计问题,而不是承诺问题。
另一种方法是考虑一些事件驱动的逻辑:实体 Customer 有一个已交付事件的侦听器,该侦听器将在解析自身之前由 waitItemDelivered() promise 触发。
编辑:根据此处的要求,对事件驱动的解决方案进行更多阐述。
简短回答:这在很大程度上取决于您的软件设计。
它已经发布并且 运行正在生产中?小心这样的变化。如果这是您正在开发的服务,您仍然及时考虑一些逻辑更改。一个不会从根本上改变工作方式但使用事件的解决方案并不是那么好,混合模式从长远来看永远不会有回报。
示例:
const events = require('events');
class Customer() {
constructor(shop) {
this.emitter = new events.EventEmitter()
this.shopEventListener = shop.emitter
this.setupEventLinstening() // for keeping things clean in the constructor
// other properties here
}
setupEventLinstening() {
this.shopEventListener.on('product-delivered', (eventData) => {
// some logic
})
}
buyProduct() {
// some logic
this.emitter.emit('waiting-delivery', eventData)
}
}
class Shop() {
constructor(shop) {
this.emitter = new events.EventEmitter()
// other properties here
}
addCustomer(customer) {
customer.emitter.on('waiting-delivery', (eventData) => {
// some logic
self.waitDelivery().then(() => self.emitter.emit('product-delivered'))
})
}
waitDelivery() {
return new Promise((resolve, reject) => {
// some logic
resolve()
})
}
}
// setup
const shop = new Shop()
const customer = new Customer(shop)
shop.addCustomer(customer)
这是一种查看逻辑的新方法,但可以在 promise 中使用类似的方法:
const waitDelivery = () => new Promise((resolve, reject) => {
logisticWaitDelivery().then(() => {
someEmitter.emit('item-delivered')
resolve()
})
}
const customerPromise = () => new Promise((resolve, reject) => {
someListener.on('item-delivered', () => {
resolve()
})
}
promiseAll([waitDelivery, customerPromise])
制作 2 个队列,第二个队列在第一个队列结束后开始:
var highPriority=[ph1,ph2,...]//promises that have to be executed first
var lowPriority=[pl1,pl2,...]//promises that have to wait the high priority promises
Promise.all(highPriority).then(()=>{
Promise.all(lowPriority)
})
My code needs to perform a multitude of async actions simultaneously and handle their promises in a specific sequential way.
您可以使用流来使用承诺,因为流本质上是一次处理一条消息的队列。
想法(即未测试):
import { Readable, Writable } from 'stream';
let customerHasLeft = false;
/*const customerLeavesCounterPromise = */timeout() // your code...
.then(() => { customerHasLeft = true; }); // ... made boolean
// let's push all our promises in a readable stream
// (they are supposedly sorted in the array)
const input = new Readable({
objectMode: true,
read: function () { // don't use arrow function: we need `this`
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...]; // your code
// put everything, in the same order, in the output queue
allItemsToBeDeliveredPromises.forEach(p => this.push(p));
this.push(null); // terminate the stream after all these
}
});
// let's declare the logic to process each promise
// (depending on `timeout()` being done)
const consumer = new Writable({
write: async function (promise, uselessEncoding, callback) {
try {
const order = await promise; // wait for the current promise to be completed
} catch (e) {
/* delivery error, do something cool like a coupon */
return callback(e); // or return callback() without e if you don't want to crash the pipe
}
if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
else { /* deliver `order` at the counter */ }
callback(); // tell the underlying queue we can process the next promise now
}
});
// launch the whole pipe
input.pipe(consumer);
// you can add listeners on all events you'd like:
// 'error', 'close', 'data', whatever...
编辑:实际上我们希望在承诺解决时按顺序处理承诺(即所有承诺的单个 post 流程)
let customerHasLeft = false;
timeout() // your code...
.then(() => { customerHasLeft = true; }); // ... made boolean
const allItemsToBeDeliveredPromises = [fetchMyItem1(), fetchMyItem2(), ...];
const postProcessChain = Promise.resolve(); // start with a promise ready to be thened
// add a next step to each promise so that as soon as one resolves, it registers
// as a next step to the post-process chain
allItemsToBeDeliveredPromises.forEach(p => p.then(order => postProcessChain.then(async () => {
// do something potentially async with the resulting order, like this:
if (customerHasLeft) { /* find a waiter (you can still `await`) and deliver `order` */ }
else { /* deliver `order` at the counter */ }
})));