我如何在 redux-saga 中以有限的并发性和理智的取消来实现批处理任务?
How do I implement a batch task in redux-saga with limited concurrency and sane cancellation?
我正在尝试通过 redux-saga 实现图片上传。我需要包括的功能是:
并发上传限制。这是通过使用 channel
作为 described in saga docs
来实现的
我收听的动作,START_UPLOADS
在下面的代码中,包含一个(可能很长)文件数组,这些文件单独发布到频道。
我需要能够通过另一个操作取消所有当前上传,CANCEL_ACTION
,包括那些到达任何 START_UPLOADS 但尚未发布到频道的,以及目前正在 uploadImage
工作人员中处理的那些。
我得到的代码如下。我的问题是 cancelAll
处理程序是在 finally
块重启传奇之后执行的,总的来说我似乎需要重启一切。它看起来笨重且容易出错。您能否就这是否是 Sagas 的使用方式提供任何建议?
function* uploadImage(file) {
const config = yield getConfig();
const getRequest = new SagaRequest();
console.log("Making async request here.");
}
function* consumeImages(uploadRequestsChannel) {
while (true) {
const fileAdded = yield take(uploadRequestsChannel);
// process the request
yield* uploadImage(fileAdded);
}
}
function* uploadImagesSaga() {
const CONCURRENT_UPLOADS = 10;
const uploadRequestsChannel = yield call(channel);
let workers = [];
function* scheduleWorkers() {
workers = [];
for (let i = 0; i < CONCURRENT_UPLOADS; i++) {
const worker = yield fork(consumeImages, uploadRequestsChannel);
workers.push(worker);
}
}
let listener;
yield* scheduleWorkers();
function* cancelAll() {
// cancel producer and consumers, flush channel
yield cancel(listener);
for (const worker of workers) {
yield cancel(worker);
}
yield flush(uploadRequestsChannel);
}
function* putToChannel(chan, task) {
return yield put(chan, task);
}
function* listenToUploads() {
try {
while (true) {
const { filesAdded } = yield take(START_UPLOADS);
for (const fileAdded of filesAdded) {
yield fork(putToChannel, uploadRequestsChannel, fileAdded);
}
}
} finally {
// if cancelled, restart consumers and producer
yield* scheduleWorkers();
listener = yield fork(listenToUploads);
}
}
listener = yield fork(listenToUploads);
while (true) {
yield take(CANCEL_ACTION);
yield call(cancelAll);
}
}
export default uploadImagesSaga;
编辑:在此处提炼到沙箱中:https://codesandbox.io/s/cancellable-counter-example-qomw6
我喜欢使用 race
来取消 - 比赛的解析值是一个具有一个键和值的对象("winning" 任务)。 redux-saga race() docs
const result = yield race({
cancel: take(CANCEL_ACTION),
listener: call(listenToUploads), // use blocking `call`, not fork
});
if (result.cancel) {
yield call(cancelAll)
}
^ 这可以包含在 while (true)
循环中,因此您应该能够合并原始示例中重复的 fork()。如果工人需要 re-scheduled,您可以考虑在 cancelAll
.
内处理它
我更喜欢让 outer-task 处理重启,而不是从它们自己的 finally
块中调用任务。
编辑:重构示例沙箱 https://codesandbox.io/s/cancellable-counter-example-j5vxr
我正在尝试通过 redux-saga 实现图片上传。我需要包括的功能是:
并发上传限制。这是通过使用
channel
作为 described in saga docs 来实现的
我收听的动作,
START_UPLOADS
在下面的代码中,包含一个(可能很长)文件数组,这些文件单独发布到频道。我需要能够通过另一个操作取消所有当前上传,
CANCEL_ACTION
,包括那些到达任何 START_UPLOADS 但尚未发布到频道的,以及目前正在uploadImage
工作人员中处理的那些。
我得到的代码如下。我的问题是 cancelAll
处理程序是在 finally
块重启传奇之后执行的,总的来说我似乎需要重启一切。它看起来笨重且容易出错。您能否就这是否是 Sagas 的使用方式提供任何建议?
function* uploadImage(file) {
const config = yield getConfig();
const getRequest = new SagaRequest();
console.log("Making async request here.");
}
function* consumeImages(uploadRequestsChannel) {
while (true) {
const fileAdded = yield take(uploadRequestsChannel);
// process the request
yield* uploadImage(fileAdded);
}
}
function* uploadImagesSaga() {
const CONCURRENT_UPLOADS = 10;
const uploadRequestsChannel = yield call(channel);
let workers = [];
function* scheduleWorkers() {
workers = [];
for (let i = 0; i < CONCURRENT_UPLOADS; i++) {
const worker = yield fork(consumeImages, uploadRequestsChannel);
workers.push(worker);
}
}
let listener;
yield* scheduleWorkers();
function* cancelAll() {
// cancel producer and consumers, flush channel
yield cancel(listener);
for (const worker of workers) {
yield cancel(worker);
}
yield flush(uploadRequestsChannel);
}
function* putToChannel(chan, task) {
return yield put(chan, task);
}
function* listenToUploads() {
try {
while (true) {
const { filesAdded } = yield take(START_UPLOADS);
for (const fileAdded of filesAdded) {
yield fork(putToChannel, uploadRequestsChannel, fileAdded);
}
}
} finally {
// if cancelled, restart consumers and producer
yield* scheduleWorkers();
listener = yield fork(listenToUploads);
}
}
listener = yield fork(listenToUploads);
while (true) {
yield take(CANCEL_ACTION);
yield call(cancelAll);
}
}
export default uploadImagesSaga;
编辑:在此处提炼到沙箱中:https://codesandbox.io/s/cancellable-counter-example-qomw6
我喜欢使用 race
来取消 - 比赛的解析值是一个具有一个键和值的对象("winning" 任务)。 redux-saga race() docs
const result = yield race({
cancel: take(CANCEL_ACTION),
listener: call(listenToUploads), // use blocking `call`, not fork
});
if (result.cancel) {
yield call(cancelAll)
}
^ 这可以包含在 while (true)
循环中,因此您应该能够合并原始示例中重复的 fork()。如果工人需要 re-scheduled,您可以考虑在 cancelAll
.
我更喜欢让 outer-task 处理重启,而不是从它们自己的 finally
块中调用任务。
编辑:重构示例沙箱 https://codesandbox.io/s/cancellable-counter-example-j5vxr