Angular 2 如何在 Observable 间隔发送参数 - 持续上传数据馈送

Angular 2 How to send a parameter in Observable interval - Continuous upload of data feed

我想更改获取消息以仅获取我尚未收到的消息的增量。因此,如果我有多个用户,它每次只会给我其他人的消息。我想每 5 秒检索一次增量并将其附加到我已经检索到的结果中。

getMessages(){
    let maxMessageId = 0;
    console.log(this.messages);
    if(this.messages.length > 0 )
    {
        maxMessageId = Math.max.apply(Math, this.messages.map(function(message){return message.messageAutoIncreamentId;}));
    }
   return Observable.interval(5000)
        .switchMap(() =>this.http.get(this._domainUrl + 'message?maxMessageId='+ maxMessageId))
        .map((response: Response)=>{
             const messages = response.json().obj;
             let transformedMessages : Message[] = [];
             for( let message of messages)
             {
                 transformedMessages.push(
                     new Message(
                                 message.content,
                                 message.user.firstName ,
                                 message._id,
                                 message.user._id,
                                 message.messageAutoIncreamentId)
                 );
             }
             this.messages = transformedMessages;
             return transformedMessages;
        })
        .catch((error: Response) => {
            this.errorService.handleError(error.json());
            return Observable.throw(error.json());
        });
}

这里的问题是maxMessageId总是0,因为它可能是在区间外初始化的。如何将参数 maxMessageId 传递给 switchMap ?

是否有比 Rx / Observables 更好的选择来连续拉取数据?

可能你想要的是

   return Observable.interval(5000)
    .map(() => {
      if(this.messages.length > 0 ) {
        return Math.max.apply(Math, this.messages.map(function(message){return message.messageAutoIncreamentId;}));
      } else {
        return 0;
      }
    })
    .switchMap((maxMessageId) => this.http.get(this._domainUrl + 'message?maxMessageId='+ maxMessageId))
    .map((response: Response)=>{
         ...
         this.messages = this.messages.concat(transformedMessages);
         return transformedMessages;
    })
    .catch((error: Response) => {
        this.errorService.handleError(error.json());
        return Observable.throw(error.json());
    });

所以您设置侦听器的第一个函数不应该保存消息,我将它们移到了函数之外,以便可以更好地访问它们。

当您现在调用 getMessages() 时,它将每 5 秒发出一次请求,处理结果,然后推回消息数组。

你需要做的是当你 运行 你的 maxId 计算时你需要在 observable 中重新运行它。您更改消息数组,因此您的最大值将发生变化。您可以 运行 它在您的有限阵列 (t运行sformedMessages) 上,但我只是在主阵列上重新 运行 它。这应该有效,如果它给您带来问题,请告诉我。

-D

let maxMessageId = 0;

let messages = [];

getMessages(){
    // initial set of max
    if(messages.length > 0 )
    {
        maxMessageId = getMaxId(this.messages);
    }
   return Observable.interval(5000)
        .switchMap(() =>this.http.get(this._domainUrl + 'message?maxMessageId='+ maxMessageId))
        .map((response: Response)=>{
             const messages = response.json().obj;
             let transformedMessages : Message[] = [];
             for( let message of messages)
             {
                 transformedMessages.push(
                     new Message(
                                 message.content,
                                 message.user.firstName ,
                                 message._id,
                                 message.user._id,
                                 message.messageAutoIncreamentId)
                 );
             }
             messages = transformedMessages;
             // this changes the messages value, so we should also change the maxID
             maxMessageId = getMaxId(messages);
             return transformedMessages;
        })
        .catch((error: Response) => {
            this.errorService.handleError(error.json());
            return Observable.throw(error.json());
        });
}

getMaxId(messageList) {
    return Math.max.apply(Math, messageList.map(function(message){return message.messageAutoIncreamentId;}));
}