在 RxJS 中,映射不执行内部映射可观察对象
In RxJS, map not executing inner mapped observable
RxJS 的新手,但我正在尝试将单个元素流映射到另一个生成数组 after 所有 internal/subsequent 流都是 finished/loaded .但是,我的内部可观察对象似乎没有执行。他们只是被冷落。
高级别,我需要执行 http post 来上传文件列表(在两个不同的数组中到两个不同的端点)。由于它们很大,我模拟时延迟了 5 秒。请求需要并行执行,但仅限于一次并发执行 X(此处为 2)。这一切都需要在管道内,并且管道应该只允许流在所有post完成后继续。
https://stackblitz.com/edit/rxjs-pnwa1b
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
map(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
from(second).pipe(map(video => of(video).pipe(delay(5000))))
)
.pipe(
mergeAll(2)
)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(toArray())
.pipe(tap(val => console.log(`emit:${val}`)))
)
)
.pipe(
catchError(error => {
console.log("error");
return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));
内部订阅不会等到它们完成。使用 forkJoin 不允许我限制并发上传。我怎样才能做到这一点?
更新:
@dmcgrandle 的回答非常有帮助,让我做出了以下似乎有效的更改:
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
mergeMap(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
)
),
mergeAll(2),
toArray()
)
.pipe(
catchError(error => {
console.log("error");
return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));
如果我对你的理解是正确的,那么我认为这是一个解决方案。您的问题出在第一个 map
,它不会执行内部订阅,而只是将流转换为 Observables 的 Observables,这似乎不是您想要的。相反,我在那里使用了 mergeMap
。
在 from
中,我使用 concatMap 强制每个 first
和 second
的发射按顺序发生,并等待一个发射完成,然后另一个发射开始。我还设置了 postToEndpoint
函数,使 return Observables 更接近您实际代码的样子。
StackBlitz 演示
代码:
import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
const postToEndpoint1$ = photo => of(photo).pipe(
tap(data => console.log('start of postTo1 for photo:', photo)),
delay(5000),
tap(data => console.log('end of postTo1 for photo:', photo))
);
const postToEndpoint2$ = video => of(video).pipe(
tap(data => console.log('start of postTo2 for video:', video)),
delay(5000),
tap(data => console.log('end of postTo2 for video:', video))
);
of(single).pipe(
tap(val => console.log(`initial emit:${val}`)),
mergeMap(claim =>
merge(
from(first).pipe(concatMap(postToEndpoint1$)),
from(second).pipe(concatMap(postToEndpoint2$))
)
),
toArray(),
catchError(error => {
console.log("error");
return throwError(error);
})
).subscribe(val => console.log(`final:`, val));
希望对您有所帮助。
RxJS 的新手,但我正在尝试将单个元素流映射到另一个生成数组 after 所有 internal/subsequent 流都是 finished/loaded .但是,我的内部可观察对象似乎没有执行。他们只是被冷落。
高级别,我需要执行 http post 来上传文件列表(在两个不同的数组中到两个不同的端点)。由于它们很大,我模拟时延迟了 5 秒。请求需要并行执行,但仅限于一次并发执行 X(此处为 2)。这一切都需要在管道内,并且管道应该只允许流在所有post完成后继续。
https://stackblitz.com/edit/rxjs-pnwa1b
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
map(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
from(second).pipe(map(video => of(video).pipe(delay(5000))))
)
.pipe(
mergeAll(2)
)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(toArray())
.pipe(tap(val => console.log(`emit:${val}`)))
)
)
.pipe(
catchError(error => {
console.log("error");
return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));
内部订阅不会等到它们完成。使用 forkJoin 不允许我限制并发上传。我怎样才能做到这一点?
更新:
@dmcgrandle 的回答非常有帮助,让我做出了以下似乎有效的更改:
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
mergeMap(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
)
),
mergeAll(2),
toArray()
)
.pipe(
catchError(error => {
console.log("error");
return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));
如果我对你的理解是正确的,那么我认为这是一个解决方案。您的问题出在第一个 map
,它不会执行内部订阅,而只是将流转换为 Observables 的 Observables,这似乎不是您想要的。相反,我在那里使用了 mergeMap
。
在 from
中,我使用 concatMap 强制每个 first
和 second
的发射按顺序发生,并等待一个发射完成,然后另一个发射开始。我还设置了 postToEndpoint
函数,使 return Observables 更接近您实际代码的样子。
StackBlitz 演示
代码:
import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
const postToEndpoint1$ = photo => of(photo).pipe(
tap(data => console.log('start of postTo1 for photo:', photo)),
delay(5000),
tap(data => console.log('end of postTo1 for photo:', photo))
);
const postToEndpoint2$ = video => of(video).pipe(
tap(data => console.log('start of postTo2 for video:', video)),
delay(5000),
tap(data => console.log('end of postTo2 for video:', video))
);
of(single).pipe(
tap(val => console.log(`initial emit:${val}`)),
mergeMap(claim =>
merge(
from(first).pipe(concatMap(postToEndpoint1$)),
from(second).pipe(concatMap(postToEndpoint2$))
)
),
toArray(),
catchError(error => {
console.log("error");
return throwError(error);
})
).subscribe(val => console.log(`final:`, val));
希望对您有所帮助。