在 RxJS 中,映射不执行内部映射可观察对象

In RxJS, map not executing inner mapped observable

RxJS 的新手,但我正在尝试将单个元素流映射到另一个生成数组 after 所有 internal/subsequent 流都是 finished/loaded .但是,我的内部可观察对象似乎没有执行。他们只是被冷落。

高级别,我需要执行 http post 来上传文件列表(在两个不同的数组中到两个不同的端点)。由于它们很大,我模拟时延迟了 5 秒。请求需要并行执行,但仅限于一次并发执行 X(此处为 2)。这一切都需要在管道内,并且管道应该只允许流在所有post完成后继续。

https://stackblitz.com/edit/rxjs-pnwa1b

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  map(claim => 
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
      from(second).pipe(map(video => of(video).pipe(delay(5000))))
    )
    .pipe(
      mergeAll(2)
    )
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(toArray())
    .pipe(tap(val => console.log(`emit:${val}`)))
  )
)
.pipe(
catchError(error => {
  console.log("error");
  return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));

内部订阅不会等到它们完成。使用 forkJoin 不允许我限制并发上传。我怎样才能做到这一点?

更新:

@dmcgrandle 的回答非常有帮助,让我做出了以下似乎有效的更改:

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  mergeMap(claim =>
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
      from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
    )
  ),
  mergeAll(2),
  toArray()
)
.pipe(
catchError(error => {
  console.log("error");
  return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));

如果我对你的理解是正确的,那么我认为这是一个解决方案。您的问题出在第一个 map,它不会执行内部订阅,而只是将流转换为 Observables 的 Observables,这似乎不是您想要的。相反,我在那里使用了 mergeMap

from 中,我使用 concatMap 强制每个 firstsecond 的发射按顺序发生,并等待一个发射完成,然后另一个发射开始。我还设置了 postToEndpoint 函数,使 return Observables 更接近您实际代码的样子。

StackBlitz 演示

代码:

import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

const postToEndpoint1$ = photo => of(photo).pipe(
  tap(data => console.log('start of postTo1 for photo:', photo)),
  delay(5000),
  tap(data => console.log('end of postTo1 for photo:', photo))
);

const postToEndpoint2$ = video => of(video).pipe(
  tap(data => console.log('start of postTo2 for video:', video)),
  delay(5000),
  tap(data => console.log('end of postTo2 for video:', video))
);

of(single).pipe(
  tap(val => console.log(`initial emit:${val}`)),
  mergeMap(claim => 
    merge(
      from(first).pipe(concatMap(postToEndpoint1$)),
      from(second).pipe(concatMap(postToEndpoint2$))
      )
  ),
  toArray(),
  catchError(error => {
    console.log("error");
    return throwError(error);
  })
).subscribe(val => console.log(`final:`, val));

希望对您有所帮助。