如何在飞镖中实现具有多个并发工作者(异步)的异步任务队列

How to implement an async task queue with multiple concurrent workers (async) in dart

我的目标是在 dart 中创建一种网络爬虫。为此,我想维护一个任务队列,其中存储需要抓取的元素(例如 URL)。元素在 returns 需要处理的更多元素的列表中的爬网函数中进行爬网。因此,这些元素被添加到队列中。示例代码:

import "dart:collection";
final queue = Queue<String>();
main() async{
  queue
    ..add("...")
    ..add("...")
    ..add("...");
  while (queue.isNotEmpty) {
    results = await crawl(queue.removeFirst());
    queue.addAll(results);
  }
}

Future<List<String>> crawl(String x) async {
  ...
  res = await http.get(x)
  ...
  return results;
}

这段粗略的代码一次只处理一个元素。但是,我希望有一个工作池(例如 5 个)从队列中取出元素并同时处理它们,然后将结果添加回队列。由于瓶颈是 HTTP 请求,我认为 Future.wait() 调用多个 worker 可以加快执行速度。但是我不想让服务器超载,因此我也想限制工作人员的数量。

这可以用基本的异步原语和信号量来实现吗?我想尽可能避免隔离,以使解决方案尽可能简单。

我不知道那里是否已经有一个包提供了这个功能,但是因为编写你自己的逻辑并不那么复杂,所以我做了下面的例子:

import 'dart:async';
import 'dart:collection';
import 'dart:math';

class TaskRunner<A, B> {
  final Queue<A> _input = Queue();
  final StreamController<B> _streamController = StreamController();
  final Future<B> Function(A) task;

  final int maxConcurrentTasks;
  int runningTasks = 0;

  TaskRunner(this.task, {this.maxConcurrentTasks = 5});

  Stream<B> get stream => _streamController.stream;

  void add(A value) {
    _input.add(value);
    _startExecution();
  }

  void addAll(Iterable<A> iterable) {
    _input.addAll(iterable);
    _startExecution();
  }

  void _startExecution() {
    if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
      return;
    }

    while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
      runningTasks++;
      print('Concurrent workers: $runningTasks');

      task(_input.removeFirst()).then((value) async {
        _streamController.add(value);

        while (_input.isNotEmpty) {
          _streamController.add(await task(_input.removeFirst()));
        }

        runningTasks--;
        print('Concurrent workers: $runningTasks');
      });
    }
  }
}

Random _rnd = Random();
Future<List<String>> crawl(String x) =>
    Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));

void main() {
  final runner = TaskRunner(crawl, maxConcurrentTasks: 3);

  runner.stream.forEach((listOfString) {
    if (listOfString.length == 1) {
      print('DONE: ${listOfString.first}');
    } else {
      print('PUTTING STRINGS ON QUEUE: $listOfString');
      runner.addAll(listOfString);
    }
  });

  runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}

输出:

Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70

我确信 class 的可用性可以提高,但我认为核心概念很容易理解。我们定义了一个 Queue 的概念,每次我们向这个 Queue 添加东西时,我们都会检查我们是否可以开始执行新的异步任务。否则我们只是跳过它,因为我们确保每个当前 运行ning 异步任务将在“关闭”之前检查 Queue 上的更多内容。

结果由 Stream 返回,您可以订阅它,例如根据我在示例中显示的结果向 TaskRunner 添加更多内容。返回数据的顺序基于它们完成的顺序。

重要的是,这不是 运行 多线程任务的方法。所有代码 运行 都在单个 Dart 隔离线程中,但是因为 HTTP 请求是 IO 延迟的,所以尝试生成多个 Future 并等待结果是有意义的。