Couchbase 异步批处理错误处理
Couchbase async batch error handling
我想,Observables+Couchbase async 让我的大脑爆炸了 api :)
有人可以帮我吗?
已经与批处理操作斗争了几天,但仍然无法理解如何通过正确的错误处理来进行批处理操作。
比方说,我想批量更新 Couchbase 中的一些文档。
如果我使用同步 API,它看起来像:
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
但这不是并行的,文档说 async API 效率更高。
我无法理解的是如何通过 Observables 创建这样的流,我试过:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
任何帮助将不胜感激!
谢谢
你可以这样做:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async()
.replace(item)
.retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException)
.onErrorReturn(e -> {
errorHandler.handleError(item.id(), e);
return null; //or item, if you need the item further down the stream
})
.subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async()
})
.subscribeOn(<something>) //with this scheduler the put() method will be executed
.subscribe();
想法是通过 flatMap()
将每个项目处理分离到一个单独的 Observable,因为每个重试逻辑都是针对单个项目,而不是针对整个流。
重试运算符使用谓词进行操作,为您提供重试计数和异常,因此在您的情况下,我们第一次仅使用特定的 CASMismatchException
异常进行重试,然后对于错误我们可以简单地执行 onErrorReturn
并处理其他错误,您甚至可以 return 如果您想继续处理该项目。
需要注意的一件事是调度程序,我不确定 Couchbase 在进行 async()
调用时是否默认在 io()
上运行。另外,请考虑这一行:
item.content().put("age", 42);
将在最后一个 subscribeOn() 上执行,因为它将在主流订阅调度程序上完成。
我想,Observables+Couchbase async 让我的大脑爆炸了 api :) 有人可以帮我吗? 已经与批处理操作斗争了几天,但仍然无法理解如何通过正确的错误处理来进行批处理操作。
比方说,我想批量更新 Couchbase 中的一些文档。 如果我使用同步 API,它看起来像:
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
但这不是并行的,文档说 async API 效率更高。 我无法理解的是如何通过 Observables 创建这样的流,我试过:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
任何帮助将不胜感激! 谢谢
你可以这样做:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async()
.replace(item)
.retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException)
.onErrorReturn(e -> {
errorHandler.handleError(item.id(), e);
return null; //or item, if you need the item further down the stream
})
.subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async()
})
.subscribeOn(<something>) //with this scheduler the put() method will be executed
.subscribe();
想法是通过 flatMap()
将每个项目处理分离到一个单独的 Observable,因为每个重试逻辑都是针对单个项目,而不是针对整个流。
重试运算符使用谓词进行操作,为您提供重试计数和异常,因此在您的情况下,我们第一次仅使用特定的 CASMismatchException
异常进行重试,然后对于错误我们可以简单地执行 onErrorReturn
并处理其他错误,您甚至可以 return 如果您想继续处理该项目。
需要注意的一件事是调度程序,我不确定 Couchbase 在进行 async()
调用时是否默认在 io()
上运行。另外,请考虑这一行:
item.content().put("age", 42);
将在最后一个 subscribeOn() 上执行,因为它将在主流订阅调度程序上完成。