当有人订阅 observable B 时订阅 observable A
Subscribe to observable A when someone subscribe to observable B
我有一个 API 调用是可观察的。
我还有一个可观察的 B 来跟踪该查询的进度:
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File) {
const generation$ = new BehaviorSubject<ActionWithResult<{ url: string }>>({
state: 'INIT',
result: null,
});
const complete = function (result: ActionWithResult<{ url: string }>) {
generation$.next(result);
generation$.complete();
};
this.generateUploadUrl(uploadConfig).pipe(
switchMap((result) => {
generation$.next({ state: 'UPLOADING', result: null });
const url = result.data.generateUploadUrl.url || '';
return this.httpClient
.pipe(
tap(() => {
complete({ state: 'SUCCEEDED', result: { url } });
}),
catchError((e) => {
complete({ state: 'FAILED', result: null });
return throwError(() => e);
})
);
}),
catchError((e) => {
complete({ state: ActionState.FAILED, result: null });
return throwError(() => e);
})
).subscribe();
return generation$
}
现在,我可以了
this.generateUrl$(option, file).subscribe(e=> {console.log(e)})
我会得到查询的状态,一旦成功就会得到结果。
但问题是,如果有人犯错并简单地做:
this.generateUrl$()
没有订阅,不会跟踪结果,但API仍然会被调用。
我想将 api observable this.generateUploadUrl
与跟踪 observable generation$
绑定
类似
return generation$.pipe(
whenSomeoneSubscribeToIt(() => {
this.generateUploadUrl(uploadConfig)
.pipe(//all the stuff above)
.subscribe()
})
)
可能吗?
这可能是在这种情况下不使用像 BehaviorSubject
这样的多播的原因之一。使用 new Observable()
函数创建一个可观察对象更适合您。
请注意,我还调整了一些其他实现细节,例如使用 tap
运算符的 finalize
和 error
回调而不是内部 catchError
运算符。
尝试以下方法
import { Observable, Observer, throwError } from 'rxjs';
import { catchError, finalize, switchMap, tap } from 'rxjs/operators';
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<any> {
return new Observable((generation$: Observer) => {
generation$.next({ state: 'INIT', result: null });
this.generateUploadUrl(uploadConfig).pipe(
switchMap((result: any) => {
generation$.next({ state: 'UPLOADING', result: null });
const url = result.data.generateUploadUrl.url || '';
return this.httpClient.get(someUrl).pipe(
tap({
next: (res: any) => generation$.next({ state: 'SUCCEEDED', result: { url } }),
error: (error: any) => generation$.next({ state: 'FAILED', result: null })
}),
finalize(() => generation$.complete())
);
}),
catchError((error: any) => {
generation$.next({ state: ActionState.FAILED, result: null });
generation$.complete();
return throwError(() => error);
})
).subscribe();
});
}
天真的解决方案
我创建了一个自定义运算符,它可以 运行 在订阅流之后但在接收到它的第一次发射之前产生效果。我将其命名为 initialize
,但它几乎就是您用 whenSomeoneSubscribeToIt
描述的内容。
这里是:
export function initialize<T>(
effect: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(ob => {
const sub = s.subscribe(ob);
effect();
return sub;
});
}
现在您可以让您的订阅生效:
generateUrl$(
uploadConfig: GenerateUploadUrlVariables,
file: File
) {
// Some stuff here
return generation$.pipe(
initialize(() => {
this.generateUploadUrl(uploadConfig).pipe(
// some more stuff here
).subscribe();
})
);
}
地道的 RxJS
您根本不需要上面的 BehaviourSubject
。您基本上是在使用它来强制生成流。
正如另一个答案所指出的,您可以直接为观察者创建相同的流。这种方法要好一些,但仍然会遇到使用行为主体会遇到的许多问题。例如,该答案创建了一个不保证完成的可观察对象,并且在您取消订阅时不管理它的内部订阅。
我想说惯用的 RxJS 方法是在使用 RxJS 运算符后以声明方式创建流。
generateUrl$(
uploadConfig: GenerateUploadUrlVariables,
file: File
): Observable<StateWithResult> {
return this.generateUploadUrl(uploadConfig).pipe(
map(result => result.data.generateUploadUrl.url || ''),
switchMap(url => this.httpClient.pipe(
ignoreElements(),
startWith({ state: 'UPLOADING', result: null }),
concatWith(of({ state: 'SUCCEEDED', result: { url } }))
)),
catchError(_ => of({ state: 'FAILED', result: null })),
startWith({ state: 'INIT', result: null })
);
}
我在这里所做的一个更改是我不会重新抛出您的错误。看起来你并没有在下游使用它们,所以这是一种更干净的处理它们的方法。您可以(当然!)回去重新抛出它们。
即使消费者没有调用 .subscribe()
,http 调用也会触发的原因是因为您在函数内部的 generateUploadUrl
上调用 .subscribe()
。
你可以得到你想要的行为(我想说的是正确的反应行为)通过简单地返回由 generateUploadUrl
返回的可观察对象并管道它的结果到您想要的值。我们可以使用 startWith
发出您的初始值:
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<ActionWithResult> {
return this.generateUploadUrl(uploadConfig).pipe(
map(result => result.data.generateUploadUrl.url || ''),
switchMap(url => this.httpClient.pipe(
map(() => ({ state: ActionState.Succeeded, result: { url } })),
startWith({ state: ActionState.Uploading, result: null })
)),
catchError(() => of({ state: ActionState.Failed, result: null })),
startWith({ state: ActionState.Init, result: null })
);
}
这是一个有效的 StackBlitz 演示。
请注意,您没有在函数内部进行订阅,这首先导致了您的不良行为。
如果您需要多播行为(如果您有多个订阅者),您只需在管道的末尾添加一个 shareReplay
。
我有一个 API 调用是可观察的。 我还有一个可观察的 B 来跟踪该查询的进度:
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File) {
const generation$ = new BehaviorSubject<ActionWithResult<{ url: string }>>({
state: 'INIT',
result: null,
});
const complete = function (result: ActionWithResult<{ url: string }>) {
generation$.next(result);
generation$.complete();
};
this.generateUploadUrl(uploadConfig).pipe(
switchMap((result) => {
generation$.next({ state: 'UPLOADING', result: null });
const url = result.data.generateUploadUrl.url || '';
return this.httpClient
.pipe(
tap(() => {
complete({ state: 'SUCCEEDED', result: { url } });
}),
catchError((e) => {
complete({ state: 'FAILED', result: null });
return throwError(() => e);
})
);
}),
catchError((e) => {
complete({ state: ActionState.FAILED, result: null });
return throwError(() => e);
})
).subscribe();
return generation$
}
现在,我可以了
this.generateUrl$(option, file).subscribe(e=> {console.log(e)})
我会得到查询的状态,一旦成功就会得到结果。
但问题是,如果有人犯错并简单地做:
this.generateUrl$()
没有订阅,不会跟踪结果,但API仍然会被调用。
我想将 api observable this.generateUploadUrl
与跟踪 observable generation$
类似
return generation$.pipe(
whenSomeoneSubscribeToIt(() => {
this.generateUploadUrl(uploadConfig)
.pipe(//all the stuff above)
.subscribe()
})
)
可能吗?
这可能是在这种情况下不使用像 BehaviorSubject
这样的多播的原因之一。使用 new Observable()
函数创建一个可观察对象更适合您。
请注意,我还调整了一些其他实现细节,例如使用 tap
运算符的 finalize
和 error
回调而不是内部 catchError
运算符。
尝试以下方法
import { Observable, Observer, throwError } from 'rxjs';
import { catchError, finalize, switchMap, tap } from 'rxjs/operators';
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<any> {
return new Observable((generation$: Observer) => {
generation$.next({ state: 'INIT', result: null });
this.generateUploadUrl(uploadConfig).pipe(
switchMap((result: any) => {
generation$.next({ state: 'UPLOADING', result: null });
const url = result.data.generateUploadUrl.url || '';
return this.httpClient.get(someUrl).pipe(
tap({
next: (res: any) => generation$.next({ state: 'SUCCEEDED', result: { url } }),
error: (error: any) => generation$.next({ state: 'FAILED', result: null })
}),
finalize(() => generation$.complete())
);
}),
catchError((error: any) => {
generation$.next({ state: ActionState.FAILED, result: null });
generation$.complete();
return throwError(() => error);
})
).subscribe();
});
}
天真的解决方案
我创建了一个自定义运算符,它可以 运行 在订阅流之后但在接收到它的第一次发射之前产生效果。我将其命名为 initialize
,但它几乎就是您用 whenSomeoneSubscribeToIt
描述的内容。
这里是:
export function initialize<T>(
effect: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(ob => {
const sub = s.subscribe(ob);
effect();
return sub;
});
}
现在您可以让您的订阅生效:
generateUrl$(
uploadConfig: GenerateUploadUrlVariables,
file: File
) {
// Some stuff here
return generation$.pipe(
initialize(() => {
this.generateUploadUrl(uploadConfig).pipe(
// some more stuff here
).subscribe();
})
);
}
地道的 RxJS
您根本不需要上面的 BehaviourSubject
。您基本上是在使用它来强制生成流。
正如另一个答案所指出的,您可以直接为观察者创建相同的流。这种方法要好一些,但仍然会遇到使用行为主体会遇到的许多问题。例如,该答案创建了一个不保证完成的可观察对象,并且在您取消订阅时不管理它的内部订阅。
我想说惯用的 RxJS 方法是在使用 RxJS 运算符后以声明方式创建流。
generateUrl$(
uploadConfig: GenerateUploadUrlVariables,
file: File
): Observable<StateWithResult> {
return this.generateUploadUrl(uploadConfig).pipe(
map(result => result.data.generateUploadUrl.url || ''),
switchMap(url => this.httpClient.pipe(
ignoreElements(),
startWith({ state: 'UPLOADING', result: null }),
concatWith(of({ state: 'SUCCEEDED', result: { url } }))
)),
catchError(_ => of({ state: 'FAILED', result: null })),
startWith({ state: 'INIT', result: null })
);
}
我在这里所做的一个更改是我不会重新抛出您的错误。看起来你并没有在下游使用它们,所以这是一种更干净的处理它们的方法。您可以(当然!)回去重新抛出它们。
即使消费者没有调用 .subscribe()
,http 调用也会触发的原因是因为您在函数内部的 generateUploadUrl
上调用 .subscribe()
。
你可以得到你想要的行为(我想说的是正确的反应行为)通过简单地返回由 generateUploadUrl
返回的可观察对象并管道它的结果到您想要的值。我们可以使用 startWith
发出您的初始值:
generateUrl$(uploadConfig: GenerateUploadUrlVariables, file: File): Observable<ActionWithResult> {
return this.generateUploadUrl(uploadConfig).pipe(
map(result => result.data.generateUploadUrl.url || ''),
switchMap(url => this.httpClient.pipe(
map(() => ({ state: ActionState.Succeeded, result: { url } })),
startWith({ state: ActionState.Uploading, result: null })
)),
catchError(() => of({ state: ActionState.Failed, result: null })),
startWith({ state: ActionState.Init, result: null })
);
}
这是一个有效的 StackBlitz 演示。
请注意,您没有在函数内部进行订阅,这首先导致了您的不良行为。
如果您需要多播行为(如果您有多个订阅者),您只需在管道的末尾添加一个 shareReplay
。