如何使用 RxJS 处理数百个请求而不会崩溃?
How do I use RxJS to process hundreds of requests without crashing?
我需要处理 400 多个员工对象,其中涉及为每个对象调用一个可观察的服务方法。
我需要知道何时处理完所有员工,并且我需要找到一种方法来执行此操作,不会导致我的客户端或服务器因同时请求而过载 - 这就是目前正在发生的事情。
我想将每个员工的处理结果收集到一个数组中,我可以从中进一步处理数据(我正在创建一个导出文件)。
我一直在尝试 RxJS concat 和 forkjoin 但我似乎仍然有 400 多个任务同时启动。
I read this page 关于 concat 这表明处理是一次处理一个,(就像一个在 ATM 排队),但我不相信 - 我的浏览器再次挂起,同时向我建议的网络请求数量。
如果我能让代码一次处理一个员工并在处理列表中的下一个员工之前完成,那就没问题了。
processEmployees(): AsyncSubject<boolean> {
let employeesProcessed: AsyncSubject<boolean> = new AsyncSubject<boolean>();
let listOfProcessesToRun = [];
this.employeeProfilesList.forEach(
(employeeProfile: EmployeeProfile) => {
listOfProcessesToRun.push(
this.annualLeaveCalculationService.startProcess(employeeProfile, this.yearToProcess).delay(2000)
);
}
)
Observable.concat(listOfProcessesToRun)
.finally(() => {
})
.subscribe(
(calculationResults: AnnualLeaveCalculationResults) => {
this.calculationResults.push(calculationResults);
employeesProcessed.complete();
},
error => {
});
return employeesProcessed;
}
我曾尝试使用 Observable.forkjoin 来处理一组可观察对象,但我相信请求是并行处理的,结果由于有大约 400 名员工要处理,我的机器挂了。
我考虑过嵌套解决方案,例如
processemployee(X) {
doServiceCall(this.employees[x])
.subscribe( response => {
processemployee(++x); // with a check to detect number of loops
}
);
}
但我不确定这是不是一个很好的模式。
您可以使用 .mergeMap
重载,它接受 concurreny
并将其设置为适合您的服务的东西(例如 20req concurrent)。这样你就可以优化最大吞吐量而不会淹没你的服务器。
我最近遇到了类似的问题,如果我理解正确的话。
我有数千个请求需要并行处理。数字太高,整个都崩了。
然后我被指示使用 bufferCount()
和 concatMap()
运算符的组合来实现我想要的。
bufferCount()
将原始流分成块,全部包含
相同数量的元素
concatMap()
通过函数详细说明第一个块(并行)
作为参数传递 - 函数 returns 一个 Observable - 当
Observable 完成,然后获取第二个块,依此类推
也许这种方法可以提供帮助。
我需要处理 400 多个员工对象,其中涉及为每个对象调用一个可观察的服务方法。
我需要知道何时处理完所有员工,并且我需要找到一种方法来执行此操作,不会导致我的客户端或服务器因同时请求而过载 - 这就是目前正在发生的事情。
我想将每个员工的处理结果收集到一个数组中,我可以从中进一步处理数据(我正在创建一个导出文件)。
我一直在尝试 RxJS concat 和 forkjoin 但我似乎仍然有 400 多个任务同时启动。
I read this page 关于 concat 这表明处理是一次处理一个,(就像一个在 ATM 排队),但我不相信 - 我的浏览器再次挂起,同时向我建议的网络请求数量。
如果我能让代码一次处理一个员工并在处理列表中的下一个员工之前完成,那就没问题了。
processEmployees(): AsyncSubject<boolean> {
let employeesProcessed: AsyncSubject<boolean> = new AsyncSubject<boolean>();
let listOfProcessesToRun = [];
this.employeeProfilesList.forEach(
(employeeProfile: EmployeeProfile) => {
listOfProcessesToRun.push(
this.annualLeaveCalculationService.startProcess(employeeProfile, this.yearToProcess).delay(2000)
);
}
)
Observable.concat(listOfProcessesToRun)
.finally(() => {
})
.subscribe(
(calculationResults: AnnualLeaveCalculationResults) => {
this.calculationResults.push(calculationResults);
employeesProcessed.complete();
},
error => {
});
return employeesProcessed;
}
我曾尝试使用 Observable.forkjoin 来处理一组可观察对象,但我相信请求是并行处理的,结果由于有大约 400 名员工要处理,我的机器挂了。
我考虑过嵌套解决方案,例如
processemployee(X) {
doServiceCall(this.employees[x])
.subscribe( response => {
processemployee(++x); // with a check to detect number of loops
}
);
}
但我不确定这是不是一个很好的模式。
您可以使用 .mergeMap
重载,它接受 concurreny
并将其设置为适合您的服务的东西(例如 20req concurrent)。这样你就可以优化最大吞吐量而不会淹没你的服务器。
我最近遇到了类似的问题,如果我理解正确的话。
我有数千个请求需要并行处理。数字太高,整个都崩了。
然后我被指示使用 bufferCount()
和 concatMap()
运算符的组合来实现我想要的。
bufferCount()
将原始流分成块,全部包含 相同数量的元素concatMap()
通过函数详细说明第一个块(并行) 作为参数传递 - 函数 returns 一个 Observable - 当 Observable 完成,然后获取第二个块,依此类推
也许这种方法可以提供帮助。