退订后取消并发HTTP请求
Cancel concurrency HTTP requests after unsubscription
我必须解决以下问题:
许多 API 调用通过 API 接口(Google API)并且必须限制每个 seconds/concurrency 的请求,因为 Google API 限制.
我使用一个主题 (sink/call pool),它管理所有 API 带有 mergeMap 的请求和 returns 另一个管道主题的结果.
因为 API 请求可以在完成前取消订阅,所以它们不应该阻塞我的接收器。所以我必须在取消订阅后停止API请求(任务)。
问题:
我不知道如何正确捕获此取消订阅状态。我目前所做的是覆盖 subscribe 和 unsubscribe 以捕获此状态。它有效,但看起来 "rxjs" 不适合我。
我可以改进什么?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any[]) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
查看 plunker 的 test.ts 和控制台输出:
我不确定我是否理解正确,但看起来您想在取消订阅时做一些清理,对吗?
您可以像这样向单个订阅添加拆卸逻辑:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
也许 finalize
管道运算符也可能有用。这一个在完成时向可观察对象添加逻辑,大部分时间是在完成或取消订阅时。 hot observables 会有所不同,所以请注意。
当创建一个可观察对象时,您还可以通过从其内部逻辑函数返回一个函数来向其中添加拆卸逻辑,就像管道化 finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
感谢@,使用 finalize 有效并且看起来好多了:
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
输出:
0: 1
Request cancelled: 2
Request aborted: 1
我必须解决以下问题:
许多 API 调用通过 API 接口(Google API)并且必须限制每个 seconds/concurrency 的请求,因为 Google API 限制.
我使用一个主题 (sink/call pool),它管理所有 API 带有 mergeMap 的请求和 returns 另一个管道主题的结果.
因为 API 请求可以在完成前取消订阅,所以它们不应该阻塞我的接收器。所以我必须在取消订阅后停止API请求(任务)。
问题: 我不知道如何正确捕获此取消订阅状态。我目前所做的是覆盖 subscribe 和 unsubscribe 以捕获此状态。它有效,但看起来 "rxjs" 不适合我。
我可以改进什么?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any[]) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
查看 plunker 的 test.ts 和控制台输出:
我不确定我是否理解正确,但看起来您想在取消订阅时做一些清理,对吗?
您可以像这样向单个订阅添加拆卸逻辑:
const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})
也许 finalize
管道运算符也可能有用。这一个在完成时向可观察对象添加逻辑,大部分时间是在完成或取消订阅时。 hot observables 会有所不同,所以请注意。
当创建一个可观察对象时,您还可以通过从其内部逻辑函数返回一个函数来向其中添加拆卸逻辑,就像管道化 finalize
:
const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
return () => { /* cleanup resources upon unsubscribe OR complete */ }
})
感谢@
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
const task = new Subject();
const ob = task.pipe(finalize(() => {
if (!task.isStopped) {
task.unsubscribe();
}
}));
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
输出:
0: 1
Request cancelled: 2
Request aborted: 1