将 setInterval() 与数组中的 .map 方法一起使用并作为承诺返回

Using setInterval() with a .map method in array and returning as a promise

我有一个函数,它接受一个 ID 列表,将它们转换成一个数组 URL,然后使用 map 函数来触发一个获取请求。它工作得很好,但它触发得太快并且提供者抛出错误,因为我们经常点击 API 。我需要为请求设置一个时间间隔,但每次我这样做时,它都不起作用。想法?

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    const requests = urls.map(url => fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    }).then(res => res.json()));
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

我使用了一个承诺,所以我可以 await 另一个函数中的函数结果来转换数据集。

想法?

我认为最好的办法是使用睡眠功能。

async function sleep() {
  return new Promise((resolve) => setTimeout(resolve, 300));
}

await sleep()

如果是 node,有一个相当轻量级的 npm 包,名为 bottleneck,它将限制请求。它工作得很好并且被广泛使用。或者你可以看看那个然后自己动手。

“简单是一种伟大的美德,但需要努力工作才能实现它,需要教育才能欣赏它。更糟糕的是:复杂性更好卖。”Edsger W. Dijkstra

公认的“轻量级”解决方案是将近 20,000 行代码,并且依赖于 CoffeeScript 和 Lua。如果您可以将所有这些换成 50 行 JavaScript?

会怎么样

假设我们有一些 job 需要一些时间来计算一些结果 -

async function job(x) {
  // job consumes some time
  await sleep(rand(5000))
  // job computes a result
  return x * 10
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(job))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

这 运行 一次完成所有十二 (12) 个作业。如果这些是对远程的请求,一些连接可能会被拒绝,因为你正在用太多的并发流量淹没服务器。通过建模 Pool 个线程,我们控制并行作业的流程 -

// my pool with four threads
const pool = new Pool(4)

async function jobQueued(x) {
  // wait for pool thread
  const close = await pool.open()
  // run the job and close the thread upon completion
  return job(x).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

函数应该很小并且只做一件事。这使得编写单个功能更容易,并提高了可重用性,允许您将几个简单的功能组合成更复杂的功能。在上面你已经看到 randsleep -

const rand = x =>
  Math.random() * x

const sleep = ms =>
  new Promise(r => setTimeout(r, ms))

如果我们想throttle每个作业,我们可以专门化sleep以确保最小运行时间-

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

async function jobQueued(x) {
  const close = await pool.open()
  // ensure job takes at least 3 seconds before freeing thread
  return throttle(job(x), 3000).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

我们可以添加一些 console.log 消息以确保一切正常 运行。并且我们会在job的开头加一个随机的sleep,表示任务可以任意顺序排队,不影响结果的顺序-

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(job(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
console.log thread 1 thread 2 thread 3 thread 4
queueing 12
   sending 12 open
queueing 9
   sending 9 open
queueing 8
   sending 8 open
queueing 4
   sending 4 open
queueing 10
queueing 6
queueing 7
queueing 2
queueing 11
      received 120 closed
   sending 11 open
queueing 3
queueing 5
queueing 1
      received 80 closed
   sending 1 open
      received 90 closed
   sending 5 open
      received 110 closed
   sending 3 open
      received 40 closed
   sending 2 open
      received 10 closed
   sending 7 open
      received 50 closed
   sending 6 open
      received 20 closed
   sending 10 open
      received 30 closed
      received 70 closed
      received 60 closed
      received 100 closed
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

上面,我们的 pool 是用 size=4 初始化的,因此最多可以同时 运行 四个作业。在我们看到 sending 四次之后,必须完成一个作业,在下一个作业开始之前我们会看到 receivedqueueing 随时可能发生。您可能还会注意到 Pool 使用高效的后进先出 (LIFO) 顺序处理排队的作业,但结果的顺序保持不变。

继续我们的实现,就像我们的其他函数一样,我们可以用一种简单的方式写 thread -

const effect = f => x =>
  (f(x), x)

const thread = close =>
  [new Promise(r => { close = effect(r) }), close]

function main () {
  const [t, close] = thread()
  console.log("please wait...")
  setTimeout(close, 3000)
  return t.then(_ => "some result")
}

main().then(console.log, console.error)

please wait...
(3 seconds later)
some result

现在我们可以使用 thread 编写更复杂的功能,例如 Pool -

class Pool {
  constructor (size = 4) {
    Object.assign(this, { pool: new Set, stack: [], size })
  }
  open () {
    return this.pool.size < this.size
      ? this.deferNow()
      : this.deferStacked()
  }
  deferNow () {
    const [t, close] = thread()
    const p = t
      .then(_ => this.pool.delete(p))
      .then(_ => this.stack.length && this.stack.pop().close())
    this.pool.add(p)
    return close
  }
  deferStacked () {
    const [t, close] = thread()
    this.stack.push({ close })
    return t.then(_ => this.deferNow())
  }
}

这样你的程序就完成了。在下面的功能演示中,我压缩了定义,以便我们可以一次看到它们。 运行程序在自己的浏览器中验证结果-

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

const myJob = x => sleep(rand(5000)).then(_ => x * 10)
const pool = new Pool(4)

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(myJob(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(JSON.stringify)
  .then(console.log, console.error)
.as-console-wrapper { min-height: 100%; }

希望您从 JavaScript 中学到了一些有趣的东西!如果您喜欢这个,请尝试扩展 Pool 功能。也许添加一个简单的 timeout 函数来确保作业在一定时间内完成。或者可以添加一个 retry 函数,如果它产生错误或超时,它会重新 运行 一个作业。要查看 Pool 应用于另一个问题,请参阅 。如果您有任何疑问,我很乐意为您提供帮助:D

首先修改您的代码,使其调用名为 waitAndFetch() 的函数。您将传入 URL 加上数组中该元素的索引值 (0,1,2,3...):

async function getReports(reportIDs) {
    const urls = reportIDs.map(id => `https://api.data.com/api/v1/report/${id}/?include_datasets=true`);
    
    const requests = urls.map((url,i) => waitAndFetch);
    
    const responses = await Promise.all(requests).catch(err => console.error(err));
    return responses;
}

接下来,创建一个简单的函数 returns 一个在 1000 x 索引值毫秒(0、1sec、2sec、3sec...)内解决的承诺:

const wait = (i) => {
    const ms = i * 1000;
    return new Promise((resolve, reject) => {
        setTimeout(resolve, ms);
    });
};

现在写waitAndFetch。它会调用 wait(),但不会关心它的 return 值。它只关心它必须等待它解决:

const waitAndFetch = async (url, i) => {
    await wait(i);
    const response = fetch(url, {
        method: 'GET',
        headers: { 'api-key': key }
    });
    return response.json();
};