RxJava - 如何为长轮询创建服务器

RxJava - How to create a server for long polling

我有一个集群中有多个节点的应用程序。每个节点将日志文件写入其本地磁盘。我已经实现了一个日志搜索功能,可以跨每个节点搜索日志。接收到浏览器搜索请求的所有者节点将日志搜索作业提交给其他节点,然后其他节点将搜索结果传递给原始节点。客户端网络浏览器使用长轮询从节点获取搜索结果。这似乎很适合 RxJava,因为每个节点都是一个事件流,客户端从所有节点获得一个合并的事件流。 (假设一个吝啬的运营团队不允许我们使用 Splunk 或其他一些商业日志记录解决方案)。

客户端轮询原始节点上的 REST API,收集搜索结果。我对 REST API 的理想逻辑如下:

我编写了以下示例代码来模拟这种情况:

public static void main(String[] args) throws Exception {
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    /* Each searchTask represents the results of a search job running on a
     * node in the cluster */
    Subject<String,String> searchTask1 = PublishSubject.create();
    Subject<String,String> searchTask2 = PublishSubject.create();

    // Limit max number of search results
    Observable<String> searchResults = 
        Observable.merge(searchTask1, searchTask2).take( 1000 );

    /* Add a 100ms buffer window to collect nearby responses together.  
     * Filter out any empty buffers to eliminate unnecessary
     * responses to the browser. */
    BlockingObservable<List<String>> blocking = 
        searchResults.buffer(100, TimeUnit.MILLISECONDS)
            .filter(results -> !results.isEmpty()).toBlocking();
    Iterator<List<String>> it = blocking.getIterator();

    /* Each call to searchTask.onNext represents a search result pushed
     * to the owner node from another node.  This code would be called 
     * from the REST endpoint. */
    executorService.submit( () -> {
        searchTask1.onNext("1");
        try { Thread.sleep(1200); } catch ( Exception ignored ) { }
        searchTask1.onNext("2");
        searchTask1.onCompleted();
    });
    executorService.submit( () -> {
        searchTask2.onNext("a");
        try { Thread.sleep(500); } catch ( Exception ignored ) { }
        searchTask2.onNext("b");
        searchTask2.onCompleted();
    });

    executorService.submit( () -> {
        /* Each iteration of this loop represents a polling request from
         * the browser and the results that are sent back to it. */
        for ( int i = 0; i < 5; i++ ) { 
            it.forEachRemaining(results -> System.out.println(results));
        }
    });

    Thread.sleep(1500);
    System.out.println("exit");
}

for 循环中的逻辑应该是什么以确保响应将始终在最多 15 秒后发送回客户端(即使响应为空)?

编辑: 我已经用更多评论更新了示例代码并展示了我当前的解决方案,但我仍然无法获得 15 秒的最大响应时间正在寻找。我们有网络设备会关闭空闲时间过长的 HTTP 连接,所以我想保证客户端最多 15 秒后总是会得到响应。

我能找到的所有 RxJava 文章似乎都非常关注客户端代码而不是服务器端。然而,在与 Observable 运算符争论了一会儿之后,我想出了以下接近我想要的解决方案。心跳每 15 秒发生一次,而不是在上一个结果后 15 秒发生一次。这意味着服务器可能会在发送结果后立即向客户端发送心跳响应,但这对我来说已经足够了。

我创建了一个 15 秒间隔的可观察对象并将其与我已有的 searchResults 可观察对象合并。我使用了一个主题,这样我就可以在结果流停止时停止可观察的间隔(否则它会无限期地继续下去)。

    /* Add a heartbeat that ensures we don't wait too long between
     * sending responses and some network device kills our connection */
    final Observable<String> heartbeat =
        Observable.interval( 1, TimeUnit.SECONDS ).map(el -> "heartbeat");
    final PublishSubject<String> stopHeartbeat = PublishSubject.create();
    searchResults.subscribe( el -> {}, ex -> {}, () -> stopHeartbeat.onNext( null ) );
    final Observable<String> searchResultsWithHeartbeat =
        searchResults.mergeWith( heartbeat.takeUntil( stopHeartbeat ) );