如果在 X 秒后没有收到 gRPC 流(可观察)数据,如何隐藏 div?

How to hide a div if no gRPC stream(observable) data received after X secs?

我正在处理绑定从 gRPC 服务流调用中消耗的流数据。流数据每 X 毫秒从 gRPC 服务发送一次,我将其作为可观察数据绑定到 html 中的 div,就像简单的 ngIf*.

我需要处理 X 秒后服务未发送任何数据的情况,我不应该显示 div。因此计划仅在 X 秒后没有收到流时才将 null 显式设置为绑定变量。

因此尝试找出使用 RxJs 运算符执行此操作的最佳方法。我正在查看 Interval、timer、switchMap,但不确定如何正确实现它们。

Service.ts

  @Injectable()
    export class ApiMyTestService {    
        client: GrpcMyTestServiceClient;
        public readonly testData$: Observable<TestReply.AsObject>;
  
    constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.testData$ = this.listCoreStream();
    }
  
   listCoreStream(): Observable <TestReply.AsObject> {
    return new Observable(obs => {      
      const req = new SomeRequest(); 
      req.setClientName("GrpcWebTestClient");
      const stream = this.client.getCoreUpdates(req); // Main gRPC Server stream Api call 
      
      stream.on('status', (status: Status) => {
        console.log('ApiService.getStream.status', status);
      });
      
      stream.on('data', (message: any) => {
        // gRPC Api call returns stream data here. Every X millisec service sends stream data.
        console.log('ApiService.getStream.data', message.toObject());
        obs.next(message.toObject() as TestReply.AsObject);
      });
      
      stream.on('end', () => {
        console.log('ApiService.getStream.end');
        obs.complete();        
      });     
    });
  }

Component.ts

export class AppComponent implements AfterViewInit, OnDestroy {   
   public testReply?: TestReply.AsObject;   
   private _subscription: Subscription;

    constructor(private readonly _MyTestService: ApiMyTestService) {     
      this._subscription = new Subscription();
    }

    public ngAfterViewInit(): void {    
        this._subscription = this._MyTestService.testData$.subscribe((data) => {           
           //testReply will be binded to a div, so hide div if null data
           // how to set to null when the service didn't stream any data after X seconds
           this.testReply = data;
            
        });
}

Html:

//show this div only when we have stream data. Hide it when no data is binded for X secs!
<div *ngIf=(testReply)>
    html elements that uses testReply
</div>

您可以创建一个 observable,它从您的服务发出值,但也会在 x 秒后发出 undefined

public testReply$ = this._MyTestService.testData$.pipe(
    switchMap(data => merge(
        of(data), 
        timer(5000).pipe(mapTo(undefined))
    ))
);

工作原理如下:

  • merge 从多个来源创建一个单一的可观察对象,它会在任何来源发出时发出。在这种情况下,来源是:

    • 服务发出的值 (data)。 of 用于从我们的普通值
    • 创建一个可观察对象
    • 一个将在 5000 毫秒后发出 undefined 的 Observable。 timer() 将在指定时间后发出数字 0,因此我们使用 mapTo 代替发出 undefined
  • switchMap 为我们订阅这个“内部可观察对象”并发出它的排放量。每次接收到发射时,它也会用一个新的替换内部可观察对象。因此,timer() 只有在 switchMap 没有在 5000 毫秒内接收到下一次发射时才会发射。

现在,您只需使用 testReply$:

public ngAfterViewInit(): void {    
    this._subscription = this.testReply$.subscribe(
        data => this.testReply = data
    );
}

为了稍微简化一下,您可以在模板中利用 AsyncPipe,而无需担心订阅:

<div *ngIf="testReply$ | async as testReply">
    <p> {{ testReply }} </p>
</div>
export class AppComponent {

  public testReply$: Observable<TestReply.AsObject> = this._MyTestService.testData$.pipe(
    switchMap(data => merge(
      of(data), 
      timer(5000).pipe(mapTo(undefined))
    ))
  );

  constructor(private readonly _MyTestService: ApiMyTestService) { }

}