Couchbase 异步批处理错误处理

Couchbase async batch error handling

我想,Observables+Couchbase async 让我的大脑爆炸了 api :) 有人可以帮我吗? 已经与批处理操作斗争了几天,但仍然无法理解如何通过正确的错误处理来进行批处理操作。

比方说,我想批量更新 Couchbase 中的一些文档。 如果我使用同步 API,它看起来像:

List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys

for (JsonDocument item : items) {
   try {
      try {
         item.content().put("age", 42);
         bucket.replace(item);
      } catch (CASMismatchException e) {
        // retry
        bucket.get(item.id()).content().put("age", 42);
        bucket.replace(item);
      }
   } catch (Exception e) {
      // handle error which doesn't stop execution for other items
      // for example, add item id to list of failed items in response
      errorHandler.handleError(item.id(), e);
   }
}

但这不是并行的,文档说 async API 效率更高。 我无法理解的是如何通过 Observables 创建这样的流,我试过:

Observable.from(items)
.flatMap(item -> {
   item.content().put("age", 42);
   return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
   // what to do? return another observable which does retry logic above?
   // how do I know what item has failed?
   // I don't have ID of that item, nor I can extract it from passed Exception
   // why onErrorResumeNext is getting called only once (if one item fails)
   // and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)

任何帮助将不胜感激! 谢谢

你可以这样做:

  Observable.from(items)
            .flatMap(item -> {
                item.content().put("age", 42);
                return bucket.async()
                        .replace(item)
                        .retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException)
                        .onErrorReturn(e -> {
                            errorHandler.handleError(item.id(), e);
                            return null; //or item, if you need the item further down the stream
                        })
                        .subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async()
            })
            .subscribeOn(<something>) //with this scheduler the put() method will be executed 
            .subscribe();

想法是通过 flatMap() 将每个项目处理分离到一个单独的 Observable,因为每个重试逻辑都是针对单个项目,而不是针对整个流。 重试运算符使用谓词进行操作,为您提供重试计数和异常,因此在您的情况下,我们第一次仅使用特定的 CASMismatchException 异常进行重试,然后对于错误我们可以简单地执行 onErrorReturn 并处理其他错误,您甚至可以 return 如果您想继续处理该项目。
需要注意的一件事是调度程序,我不确定 Couchbase 在进行 async() 调用时是否默认在 io() 上运行。另外,请考虑这一行:

 item.content().put("age", 42);

将在最后一个 subscribeOn() 上执行,因为它将在主流订阅调度程序上完成。