RxJava + 改造轮询

RxJava + Retrofit polling

我的问题是我无法通过 Retrofit 获得无限流。在我获得初始 poll() 请求的凭据后 - 我执行初始 poll() 请求。如果没有变化,每个 poll() 请求都会在 25 秒内做出响应,如果有任何变化,则更早响应 - 返回 changed_data[]。每个响应包含下一个轮询请求所需的 timestamp 数据 - 我应该在每个 poll() 响应后执行新的 poll() 请求。这是我的代码:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});

我是RxJava的新手,可能有些不明白,但是我无法无限流。我接到 3 个电话,然后是 onNext 和 onComplete。

P.S。也许有更好的解决方案来实现 Android?

上的长轮询

虽然不理想,但我相信您可以使用 RX 的副作用来达到预期的结果('doOn' 操作)。

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
    // side effect variable
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
            .flatMap(credentials -> api.query("request", credentials, timestamp.get()))  // this will use the value from previous doOnNext
            .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
            .repeat();
})
        .retry()
        .share();

private static class CredentialsWithTimestamp {

    public final String credentials;
    public final long timestamp; // I assume this is necessary for you from the first request

    public CredentialsWithTimestamp(String credentials, long timestamp) {
        this.credentials = credentials;
        this.timestamp = timestamp;
    }
}

当订阅 'o' 时,内部 observable 将重复。如果出现错误,那么 'o' 将重试并从凭证流中重新请求。

在您的示例中,计算导向是通过更新下一个请求所必需的时间戳变量来实现的。