使用 RxJava 如何在 couchbase 中查找不可用的实体?

How to find unavailable entities in couchbase while using RxJava?

需求:从couchbase中批量读取一堆对象。即使一个对象不可用,也会抛出异常并显示一条消息,指出以下对象不可用。

方法: 我们使用 RxJava 和 asyncBucket 从数据库中读取这些对象。

代码:

        final List<String> failedToReadOrders = new ArrayList<>();
        final List<Order> list = Observable
                .from(ids)
                .subscribeOn(Schedulers.io())
                .flatMap(id -> asyncBucket
                        .get(id)
                        .doOnError(ex-> {
                            failedToReadOrders.add(id);
                            LOGGER.error("Error occured while reading order with ID key={}",id);
                            })
                        .retryWhen(retryFunc())
                        .onErrorResumeNext(Observable.empty()))
                .map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
                .toList()
                .toBlocking()
                .single();

doOnError 方法只有在 asyncBucket.get() 方法抛出任何异常时才会被调用。但是,如果该项目不可用,则 asyncBucket.get() 文档会说 "If the document is not found, the Observable completes without an item emitted.".

问题: 我当然可以通过找到要读取的 ids 列表和返回的对象之间的差异来知道哪些项目没有被读取。但是,是否可以在上面的代码本身中收集这些 id,这样我就不必再次在列表上循环?

List<Order> list = Observable
    .from(ids)
    .subscribeOn(Schedulers.io())
    .flatMap(id -> asyncBucket
        .get(id)
        .switchIfEmpty(Observable.error(new Throwable("No data found")))
        .doOnError(ex-> {
            failedToReadOrders.add(id);
            LOGGER.error("Error occured while reading order with ID key={}",id);
        })
        .retryWhen(retryFunc())
        .onErrorResumeNext(Observable.empty()))
    .map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
    .toList()
    .toBlocking()
    .single();