如何在飞镖中实现具有多个并发工作者(异步)的异步任务队列
How to implement an async task queue with multiple concurrent workers (async) in dart
我的目标是在 dart 中创建一种网络爬虫。为此,我想维护一个任务队列,其中存储需要抓取的元素(例如 URL)。元素在 returns 需要处理的更多元素的列表中的爬网函数中进行爬网。因此,这些元素被添加到队列中。示例代码:
import "dart:collection";
final queue = Queue<String>();
main() async{
queue
..add("...")
..add("...")
..add("...");
while (queue.isNotEmpty) {
results = await crawl(queue.removeFirst());
queue.addAll(results);
}
}
Future<List<String>> crawl(String x) async {
...
res = await http.get(x)
...
return results;
}
这段粗略的代码一次只处理一个元素。但是,我希望有一个工作池(例如 5 个)从队列中取出元素并同时处理它们,然后将结果添加回队列。由于瓶颈是 HTTP 请求,我认为 Future.wait() 调用多个 worker 可以加快执行速度。但是我不想让服务器超载,因此我也想限制工作人员的数量。
这可以用基本的异步原语和信号量来实现吗?我想尽可能避免隔离,以使解决方案尽可能简单。
我不知道那里是否已经有一个包提供了这个功能,但是因为编写你自己的逻辑并不那么复杂,所以我做了下面的例子:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
输出:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
我确信 class 的可用性可以提高,但我认为核心概念很容易理解。我们定义了一个 Queue
的概念,每次我们向这个 Queue
添加东西时,我们都会检查我们是否可以开始执行新的异步任务。否则我们只是跳过它,因为我们确保每个当前 运行ning 异步任务将在“关闭”之前检查 Queue
上的更多内容。
结果由 Stream
返回,您可以订阅它,例如根据我在示例中显示的结果向 TaskRunner
添加更多内容。返回数据的顺序基于它们完成的顺序。
重要的是,这不是 运行 多线程任务的方法。所有代码 运行 都在单个 Dart 隔离线程中,但是因为 HTTP 请求是 IO 延迟的,所以尝试生成多个 Future
并等待结果是有意义的。
我的目标是在 dart 中创建一种网络爬虫。为此,我想维护一个任务队列,其中存储需要抓取的元素(例如 URL)。元素在 returns 需要处理的更多元素的列表中的爬网函数中进行爬网。因此,这些元素被添加到队列中。示例代码:
import "dart:collection";
final queue = Queue<String>();
main() async{
queue
..add("...")
..add("...")
..add("...");
while (queue.isNotEmpty) {
results = await crawl(queue.removeFirst());
queue.addAll(results);
}
}
Future<List<String>> crawl(String x) async {
...
res = await http.get(x)
...
return results;
}
这段粗略的代码一次只处理一个元素。但是,我希望有一个工作池(例如 5 个)从队列中取出元素并同时处理它们,然后将结果添加回队列。由于瓶颈是 HTTP 请求,我认为 Future.wait() 调用多个 worker 可以加快执行速度。但是我不想让服务器超载,因此我也想限制工作人员的数量。
这可以用基本的异步原语和信号量来实现吗?我想尽可能避免隔离,以使解决方案尽可能简单。
我不知道那里是否已经有一个包提供了这个功能,但是因为编写你自己的逻辑并不那么复杂,所以我做了下面的例子:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
输出:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
我确信 class 的可用性可以提高,但我认为核心概念很容易理解。我们定义了一个 Queue
的概念,每次我们向这个 Queue
添加东西时,我们都会检查我们是否可以开始执行新的异步任务。否则我们只是跳过它,因为我们确保每个当前 运行ning 异步任务将在“关闭”之前检查 Queue
上的更多内容。
结果由 Stream
返回,您可以订阅它,例如根据我在示例中显示的结果向 TaskRunner
添加更多内容。返回数据的顺序基于它们完成的顺序。
重要的是,这不是 运行 多线程任务的方法。所有代码 运行 都在单个 Dart 隔离线程中,但是因为 HTTP 请求是 IO 延迟的,所以尝试生成多个 Future
并等待结果是有意义的。