Rxjs 如何获取在 concatMap 期间缓冲的所有值
Rxjs how to get all values that are buffered during a concatMap
考虑以下流:
interval(1000)
.pipe(
take(5),
concatMap((val) => {
console.log(val, 'process');
return of(val).pipe(delay(3000));
})
)
.subscribe((val) => console.log(val, 'emit'));
正如预期的那样,0 发出并命中了 concat 映射。我们看到 'process' 已记录。在此期间,1 和 2 已从源头发出。我想做的是说“现在我的 concatmap 已经完成,给我所有当前在流中的项目。”所以下一个发射将是 [1, 2],我们将看到“[1,2] 进程”。
我不确定如何实现。我已经尝试使用 buffer
并在每次 concatMap 发出时发出,但它从来没有在该缓冲区上获得初始发出,它使用计时器和竞赛进行黑客攻击,即使那样它也不能很好地工作。
这是我目前的解决方案:
const dequeueSignal$ = new Subject();
interval(5000)
.pipe(
buffer(dequeueSignal$),
concatMap((val) => {
console.log(val, 'process');
return of(val).pipe(
delay(getRandomInt(20000)),
tap(() => dequeueSignal$.next(null))
);
})
)
.subscribe((val) => console.log(val, 'emit'));
dequeueSignal$.next(null)
这是我能得到的最接近的。这意味着如果缓冲区中没有任何内容,它仍然会发出并且循环继续。但是,这有很大的缺点:
- 虽然队列中没有任何内容,但它不断循环(延迟只是为了调试目的。在我的真实场景中,这是一个 http 调用)
- 依赖外部调用来启动整个过程。
因此,这感觉很老套和脆弱。是否有可用于创建此场景的运算符集?
如果我理解正确,我会这样处理。
首先我们隔离源流。考虑到我们使用 share
运算符来确保 source$
流被我们稍后从 source$
.
开始创建的其他 Observable 共享。
const source$ = interval(1000).pipe(share(), take(5));
然后我认为您在示例中放置的延迟表示由某种函数执行的某种处理,可能是对异步服务的调用。如果这是真的,那么我们可以有一个 process
函数,其中 returns 一个 Observable。这个函数的模拟版本可能是这个
function process(val) {
return of(val).pipe(
delay(3000),
tap({
next: (val) => console.log(val, "processed"),
})
);
}
现在,如果所有这些假设都成立,那么我们可以定义一个 dequeueSignal$
主题,如您的示例中所示,然后定义 2 个不同的流,一个流仅采用 [=14 通知的第一个元素=] 和一个包含所有其他元素的流,像这样
const dequeueSignal$ = new Subject<any>();
const first$ = source$.pipe(
first(),
concatMap((val) => {
return process(val).pipe(
tap({
complete: () => {
dequeueSignal$.next(null);
},
})
);
})
);
const afterFirst$ = source$.pipe(skip(1)).pipe(
buffer(dequeueSignal$),
concatMap((val) => {
return process(val).pipe(
tap({
complete: () => {
dequeueSignal$.next(null);
},
})
);
})
);
dequeueSignal$
用于触发释放buffer
运算符存储的缓冲区。
dequeueSignal$
在 first$
流中被 next
ed 一次,并且在 afterFirst$
流通知的任何时候。
dequeueSignal$
的通知触发缓冲项目的释放。
Here a stackblitz 显示代码。
可能更优雅的解决方案可以作为 mergeMap
运算符 code 的变体来实现,但它可能看起来有点复杂。
自定义运算符可以满足此要求,该运算符利用 concatMap
但会根据需要修改其行为。
自定义运算符必须保持一些状态:
- 一个
bufferedNotifications
缓冲区,它存储从上游接收的元素,以防已经有一个正在运行的请求
- a
processing
布尔值,如果为真,表示当前正在处理一个请求,在 processing
期间收到的任何通知都会添加到 bufferedNotifications
重要的是要注意,自定义运算符是 returns 类型 (observable: Observable) => Observable
.
的函数
说了这么多,我们来看一下自定义算子的代码
// the custom operator requires a function as input
// this function represents the actual processing that happens in the stream
// a type T is also added for better type checking and support
function concatBufferedMap<T>(processor: (val: T[]) => Observable<T[]>) {
// as said above, the custom operator returns a function which expects
// an Obaservable as input and returns another Obaservable
return (sourceObservable: Observable<T>) =>
// the Observable to be returned is constructed using the Observable constructor
new Observable<T[]>((subscriber) => {
// this function will be called each time this
// Observable is subscribed to.
// here we define the variables holding the state
// the state is initialized any time the Observable is subscribed
let bufferedNotifications = [] as T[];
let processing = false;
// the Observable returned by the custom operator transforms the
// source observable via a pipe and then subscribes to it
const subscription = sourceObservable
.pipe(
// the first thing we do when a value is notified by the
// source observable (i.e. by upstream) is to push the value
// into the buffer
tap((val) => {
bufferedNotifications.push(val);
}),
// then we leverage concatMap to make sure that we concatenate
// the various processings
concatMap(() => {
// if we are processing a previous request
// or if there are no items in the buffer
// just complete without notifying anything
// concatMap requires that the value returned by the function
// passed in as parameter is an Observable, so in this case
// we return EMPTY, which is an Observable that does nothing
// and immediately completes
if (processing || bufferedNotifications.length === 0) {
return EMPTY;
}
// otherwise, if there is no processing on fly and there are
// values in the buffer, we return the Observable created by
// the processor function and we set the state so that it reflects
// that we have started processing something
processing = true;
// we create a copy of the buffer so that the param passed
// to the processor function is not altered by other potential
// values which can be added to the buffer during the execution
// of processor logic
const _buffer = [...bufferedNotifications];
// the buffer is also reset
bufferedNotifications = [];
return processor(_buffer).pipe(
tap({
complete: () => {
console.log("DONE concatMap");
// when processor completes its processing, the processing
// state variable is set to false
processing = false;
},
})
);
})
)
// in the subscription logic, we notify the subscriber (i.e.
// downstream) of the values notified by the processor logic
.subscribe({
next(value) {
subscriber.next(value);
},
error(err) {
// We need to make sure we're propagating our errors through.
subscriber.error(err);
},
complete() {
console.log("subscriber complete");
subscriber.complete();
},
});
// Return the teardown logic. This will be invoked when
// the result errors, completes, or is unsubscribed.
return () => {
subscription.unsubscribe();
};
});
}
This stackblitz 显示代码如何工作以及应产生预期结果。
考虑以下流:
interval(1000)
.pipe(
take(5),
concatMap((val) => {
console.log(val, 'process');
return of(val).pipe(delay(3000));
})
)
.subscribe((val) => console.log(val, 'emit'));
正如预期的那样,0 发出并命中了 concat 映射。我们看到 'process' 已记录。在此期间,1 和 2 已从源头发出。我想做的是说“现在我的 concatmap 已经完成,给我所有当前在流中的项目。”所以下一个发射将是 [1, 2],我们将看到“[1,2] 进程”。
我不确定如何实现。我已经尝试使用 buffer
并在每次 concatMap 发出时发出,但它从来没有在该缓冲区上获得初始发出,它使用计时器和竞赛进行黑客攻击,即使那样它也不能很好地工作。
这是我目前的解决方案:
const dequeueSignal$ = new Subject();
interval(5000)
.pipe(
buffer(dequeueSignal$),
concatMap((val) => {
console.log(val, 'process');
return of(val).pipe(
delay(getRandomInt(20000)),
tap(() => dequeueSignal$.next(null))
);
})
)
.subscribe((val) => console.log(val, 'emit'));
dequeueSignal$.next(null)
这是我能得到的最接近的。这意味着如果缓冲区中没有任何内容,它仍然会发出并且循环继续。但是,这有很大的缺点:
- 虽然队列中没有任何内容,但它不断循环(延迟只是为了调试目的。在我的真实场景中,这是一个 http 调用)
- 依赖外部调用来启动整个过程。
因此,这感觉很老套和脆弱。是否有可用于创建此场景的运算符集?
如果我理解正确,我会这样处理。
首先我们隔离源流。考虑到我们使用 share
运算符来确保 source$
流被我们稍后从 source$
.
const source$ = interval(1000).pipe(share(), take(5));
然后我认为您在示例中放置的延迟表示由某种函数执行的某种处理,可能是对异步服务的调用。如果这是真的,那么我们可以有一个 process
函数,其中 returns 一个 Observable。这个函数的模拟版本可能是这个
function process(val) {
return of(val).pipe(
delay(3000),
tap({
next: (val) => console.log(val, "processed"),
})
);
}
现在,如果所有这些假设都成立,那么我们可以定义一个 dequeueSignal$
主题,如您的示例中所示,然后定义 2 个不同的流,一个流仅采用 [=14 通知的第一个元素=] 和一个包含所有其他元素的流,像这样
const dequeueSignal$ = new Subject<any>();
const first$ = source$.pipe(
first(),
concatMap((val) => {
return process(val).pipe(
tap({
complete: () => {
dequeueSignal$.next(null);
},
})
);
})
);
const afterFirst$ = source$.pipe(skip(1)).pipe(
buffer(dequeueSignal$),
concatMap((val) => {
return process(val).pipe(
tap({
complete: () => {
dequeueSignal$.next(null);
},
})
);
})
);
dequeueSignal$
用于触发释放buffer
运算符存储的缓冲区。
dequeueSignal$
在 first$
流中被 next
ed 一次,并且在 afterFirst$
流通知的任何时候。
dequeueSignal$
的通知触发缓冲项目的释放。
Here a stackblitz 显示代码。
可能更优雅的解决方案可以作为 mergeMap
运算符 code 的变体来实现,但它可能看起来有点复杂。
自定义运算符可以满足此要求,该运算符利用 concatMap
但会根据需要修改其行为。
自定义运算符必须保持一些状态:
- 一个
bufferedNotifications
缓冲区,它存储从上游接收的元素,以防已经有一个正在运行的请求 - a
processing
布尔值,如果为真,表示当前正在处理一个请求,在processing
期间收到的任何通知都会添加到bufferedNotifications
重要的是要注意,自定义运算符是 returns 类型 (observable: Observable) => Observable
.
说了这么多,我们来看一下自定义算子的代码
// the custom operator requires a function as input
// this function represents the actual processing that happens in the stream
// a type T is also added for better type checking and support
function concatBufferedMap<T>(processor: (val: T[]) => Observable<T[]>) {
// as said above, the custom operator returns a function which expects
// an Obaservable as input and returns another Obaservable
return (sourceObservable: Observable<T>) =>
// the Observable to be returned is constructed using the Observable constructor
new Observable<T[]>((subscriber) => {
// this function will be called each time this
// Observable is subscribed to.
// here we define the variables holding the state
// the state is initialized any time the Observable is subscribed
let bufferedNotifications = [] as T[];
let processing = false;
// the Observable returned by the custom operator transforms the
// source observable via a pipe and then subscribes to it
const subscription = sourceObservable
.pipe(
// the first thing we do when a value is notified by the
// source observable (i.e. by upstream) is to push the value
// into the buffer
tap((val) => {
bufferedNotifications.push(val);
}),
// then we leverage concatMap to make sure that we concatenate
// the various processings
concatMap(() => {
// if we are processing a previous request
// or if there are no items in the buffer
// just complete without notifying anything
// concatMap requires that the value returned by the function
// passed in as parameter is an Observable, so in this case
// we return EMPTY, which is an Observable that does nothing
// and immediately completes
if (processing || bufferedNotifications.length === 0) {
return EMPTY;
}
// otherwise, if there is no processing on fly and there are
// values in the buffer, we return the Observable created by
// the processor function and we set the state so that it reflects
// that we have started processing something
processing = true;
// we create a copy of the buffer so that the param passed
// to the processor function is not altered by other potential
// values which can be added to the buffer during the execution
// of processor logic
const _buffer = [...bufferedNotifications];
// the buffer is also reset
bufferedNotifications = [];
return processor(_buffer).pipe(
tap({
complete: () => {
console.log("DONE concatMap");
// when processor completes its processing, the processing
// state variable is set to false
processing = false;
},
})
);
})
)
// in the subscription logic, we notify the subscriber (i.e.
// downstream) of the values notified by the processor logic
.subscribe({
next(value) {
subscriber.next(value);
},
error(err) {
// We need to make sure we're propagating our errors through.
subscriber.error(err);
},
complete() {
console.log("subscriber complete");
subscriber.complete();
},
});
// Return the teardown logic. This will be invoked when
// the result errors, completes, or is unsubscribed.
return () => {
subscription.unsubscribe();
};
});
}
This stackblitz 显示代码如何工作以及应产生预期结果。