如何在 RxJS v5 中暂停和缓冲 Observable
How to pause and buffer Observables in RxJS v5
我正在尝试对 HTTP 请求实施背压策略,以在特定条件下暂时阻止未决请求数秒。暂停的逻辑将基于另一个 Observable。
我的研究和理解使我相信 pausableBuffered
运算符完全符合我的需要。此处记录 http://reactivex.io/documentation/operators/backpressure.html.
但是我在 ReactiveX v5 (5.0.0-beta.0) 中找不到这个运算符,迁移指南 (v4 - v5) 似乎表明它们已被删除。如果是这种情况,我怎样才能使用 v5 可用的运算符达到预期的结果?
背压故事现在完全是 dropped。
这是获得相同结果的一种方法:
const pausableBuffered = (observable, pauser) => {
const subj = new rx.Subject();
let buffer = [];
const nextEmitter = x => subj.next(x);
const nextBuffer = x => buffer.push(x);
let subscriber = nextEmitter;
observable.subscribe(x => subscriber(x));
pauser.subscribe(value => {
if (value) {
subscriber = nextBuffer;
} else {
buffer.forEach(nextEmitter);
buffer = [];
subscriber = nextEmitter;
}
})
return subj;
};
我偶然发现了这个答案,对于我的用例,我把它变成了一个管道
import { Observable, Subject, Subscription } from "rxjs";
export function pausable(pauseToken: Observable<boolean>, startPuased: boolean, lastOnly: boolean) {
return function <T>(source: Subject<T>): Observable<T> {
let buffer: T[] = [];
const nextEmitter = (x: T) => subj.next(x);
const nextBuffer = (x: any) => buffer.push(x);
var sourceSubscription: Subscription;
var pauseSubscription: Subscription;
var subj = new Subject<T>();
let subscriber = nextEmitter;
if (startPuased) {
subscriber = nextBuffer;
}
sourceSubscription = source.subscribe({
next(value) {
subscriber(value);
},
error(error) {
subj.error(error);
},
complete() {
subj.complete();
pauseSubscription?.unsubscribe();
}
})
pauseSubscription = pauseToken.subscribe({
next(value) {
if (value) {
subscriber = nextBuffer;
} else {
if (lastOnly && buffer.length > 0) {
nextEmitter(buffer.pop())
} else {
buffer.forEach(nextEmitter);
}
buffer = [];
subscriber = nextEmitter;
}
},
complete() {
sourceSubscription?.unsubscribe();
pauseSubscription?.unsubscribe();
}
});
return subj;
}
}
我正在尝试对 HTTP 请求实施背压策略,以在特定条件下暂时阻止未决请求数秒。暂停的逻辑将基于另一个 Observable。
我的研究和理解使我相信 pausableBuffered
运算符完全符合我的需要。此处记录 http://reactivex.io/documentation/operators/backpressure.html.
但是我在 ReactiveX v5 (5.0.0-beta.0) 中找不到这个运算符,迁移指南 (v4 - v5) 似乎表明它们已被删除。如果是这种情况,我怎样才能使用 v5 可用的运算符达到预期的结果?
背压故事现在完全是 dropped。
这是获得相同结果的一种方法:
const pausableBuffered = (observable, pauser) => {
const subj = new rx.Subject();
let buffer = [];
const nextEmitter = x => subj.next(x);
const nextBuffer = x => buffer.push(x);
let subscriber = nextEmitter;
observable.subscribe(x => subscriber(x));
pauser.subscribe(value => {
if (value) {
subscriber = nextBuffer;
} else {
buffer.forEach(nextEmitter);
buffer = [];
subscriber = nextEmitter;
}
})
return subj;
};
我偶然发现了这个答案,对于我的用例,我把它变成了一个管道
import { Observable, Subject, Subscription } from "rxjs";
export function pausable(pauseToken: Observable<boolean>, startPuased: boolean, lastOnly: boolean) {
return function <T>(source: Subject<T>): Observable<T> {
let buffer: T[] = [];
const nextEmitter = (x: T) => subj.next(x);
const nextBuffer = (x: any) => buffer.push(x);
var sourceSubscription: Subscription;
var pauseSubscription: Subscription;
var subj = new Subject<T>();
let subscriber = nextEmitter;
if (startPuased) {
subscriber = nextBuffer;
}
sourceSubscription = source.subscribe({
next(value) {
subscriber(value);
},
error(error) {
subj.error(error);
},
complete() {
subj.complete();
pauseSubscription?.unsubscribe();
}
})
pauseSubscription = pauseToken.subscribe({
next(value) {
if (value) {
subscriber = nextBuffer;
} else {
if (lastOnly && buffer.length > 0) {
nextEmitter(buffer.pop())
} else {
buffer.forEach(nextEmitter);
}
buffer = [];
subscriber = nextEmitter;
}
},
complete() {
sourceSubscription?.unsubscribe();
pauseSubscription?.unsubscribe();
}
});
return subj;
}
}