如果在 X 秒后没有收到 gRPC 流(可观察)数据,如何隐藏 div?
How to hide a div if no gRPC stream(observable) data received after X secs?
我正在处理绑定从 gRPC 服务流调用中消耗的流数据。流数据每 X 毫秒从 gRPC 服务发送一次,我将其作为可观察数据绑定到 html 中的 div,就像简单的 ngIf*.
我需要处理 X 秒后服务未发送任何数据的情况,我不应该显示 div。因此计划仅在 X 秒后没有收到流时才将 null 显式设置为绑定变量。
因此尝试找出使用 RxJs 运算符执行此操作的最佳方法。我正在查看 Interval、timer、switchMap,但不确定如何正确实现它们。
Service.ts
@Injectable()
export class ApiMyTestService {
client: GrpcMyTestServiceClient;
public readonly testData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.testData$ = this.listCoreStream();
}
listCoreStream(): Observable <TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
req.setClientName("GrpcWebTestClient");
const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call
stream.on('status', (status: Status) => {
console.log('ApiService.getStream.status', status);
});
stream.on('data', (message: any) => {
// gRPC Api call returns stream data here. Every X millisec service sends stream data.
console.log('ApiService.getStream.data', message.toObject());
obs.next(message.toObject() as TestReply.AsObject);
});
stream.on('end', () => {
console.log('ApiService.getStream.end');
obs.complete();
});
});
}
Component.ts
export class AppComponent implements AfterViewInit, OnDestroy {
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: ApiMyTestService) {
this._subscription = new Subscription();
}
public ngAfterViewInit(): void {
this._subscription = this._MyTestService.testData$.subscribe((data) => {
//testReply will be binded to a div, so hide div if null data
// how to set to null when the service didn't stream any data after X seconds
this.testReply = data;
});
}
Html:
//show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
html elements that uses testReply
</div>
您可以创建一个 observable,它从您的服务发出值,但也会在 x 秒后发出 undefined
:
public testReply$ = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
工作原理如下:
merge
从多个来源创建一个单一的可观察对象,它会在任何来源发出时发出。在这种情况下,来源是:
switchMap
为我们订阅这个“内部可观察对象”并发出它的排放量。每次接收到发射时,它也会用一个新的替换内部可观察对象。因此,timer()
只有在 switchMap
没有在 5000 毫秒内接收到下一次发射时才会发射。
现在,您只需使用 testReply$
:
public ngAfterViewInit(): void {
this._subscription = this.testReply$.subscribe(
data => this.testReply = data
);
}
为了稍微简化一下,您可以在模板中利用 AsyncPipe
,而无需担心订阅:
<div *ngIf="testReply$ | async as testReply">
<p> {{ testReply }} </p>
</div>
export class AppComponent {
public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
constructor(private readonly _MyTestService: ApiMyTestService) { }
}
我正在处理绑定从 gRPC 服务流调用中消耗的流数据。流数据每 X 毫秒从 gRPC 服务发送一次,我将其作为可观察数据绑定到 html 中的 div,就像简单的 ngIf*.
我需要处理 X 秒后服务未发送任何数据的情况,我不应该显示 div。因此计划仅在 X 秒后没有收到流时才将 null 显式设置为绑定变量。
因此尝试找出使用 RxJs 运算符执行此操作的最佳方法。我正在查看 Interval、timer、switchMap,但不确定如何正确实现它们。
Service.ts
@Injectable()
export class ApiMyTestService {
client: GrpcMyTestServiceClient;
public readonly testData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.testData$ = this.listCoreStream();
}
listCoreStream(): Observable <TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
req.setClientName("GrpcWebTestClient");
const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call
stream.on('status', (status: Status) => {
console.log('ApiService.getStream.status', status);
});
stream.on('data', (message: any) => {
// gRPC Api call returns stream data here. Every X millisec service sends stream data.
console.log('ApiService.getStream.data', message.toObject());
obs.next(message.toObject() as TestReply.AsObject);
});
stream.on('end', () => {
console.log('ApiService.getStream.end');
obs.complete();
});
});
}
Component.ts
export class AppComponent implements AfterViewInit, OnDestroy {
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: ApiMyTestService) {
this._subscription = new Subscription();
}
public ngAfterViewInit(): void {
this._subscription = this._MyTestService.testData$.subscribe((data) => {
//testReply will be binded to a div, so hide div if null data
// how to set to null when the service didn't stream any data after X seconds
this.testReply = data;
});
}
Html:
//show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
html elements that uses testReply
</div>
您可以创建一个 observable,它从您的服务发出值,但也会在 x 秒后发出 undefined
:
public testReply$ = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
工作原理如下:
merge
从多个来源创建一个单一的可观察对象,它会在任何来源发出时发出。在这种情况下,来源是:switchMap
为我们订阅这个“内部可观察对象”并发出它的排放量。每次接收到发射时,它也会用一个新的替换内部可观察对象。因此,timer()
只有在switchMap
没有在 5000 毫秒内接收到下一次发射时才会发射。
现在,您只需使用 testReply$
:
public ngAfterViewInit(): void {
this._subscription = this.testReply$.subscribe(
data => this.testReply = data
);
}
为了稍微简化一下,您可以在模板中利用 AsyncPipe
,而无需担心订阅:
<div *ngIf="testReply$ | async as testReply">
<p> {{ testReply }} </p>
</div>
export class AppComponent {
public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
switchMap(data => merge(
of(data),
timer(5000).pipe(mapTo(undefined))
))
);
constructor(private readonly _MyTestService: ApiMyTestService) { }
}