如何在 RxJava 和 couchbase 中使用排序?
How to use sorting in RxJava and couchbase?
我写了下面提到的方法来从 couchbase 服务器批量获取数据。
bucket.async()
.query(N1qlQuery.simple(query))
.doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t)))
.flatMap(AsyncN1qlQueryResult::rows)
.flatMap(row ->
bucket.async().
get(row.value().getString("id")))
.map(JsonDocument::content).
toList()
.toBlocking()
.single();
当我通过查询时,此代码运行良好
"SELECT meta().id as id FROM bucket"
但是当我使用类似
的东西时
"SELECT meta().id as id FROM bucket order by id ASC"
我得到的结果没有排序。但是,当我 运行 查询控制台上的相同查询结果符合预期时。这让我相信我在 rxJava 中做错了什么。请帮我解决这个问题。
由于 flatMap()
运算符,顺序丢失,它应用并发流,但不维护顺序。
当您应用 flatMap()
时,您正在为每个 onNext()
创建并订阅一个新的 Observable
,这意味着对于每一行,您正在并行执行此行:
bucket.async().
get(row.value().getString("id")))
然后每个获取操作将在不同的时间完成,并且获取的内容将无序地发出。
如果你想保持顺序但又不想失去并行性,你应该使用 concatMap(),它将只保持 1 个活动流,并按顺序订阅每个提取操作。
如果你need/want并行,你应该使用concatMapEager(),这将并行执行每个创建的 Observable,但会按顺序发出项目。
我写了下面提到的方法来从 couchbase 服务器批量获取数据。
bucket.async()
.query(N1qlQuery.simple(query))
.doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t)))
.flatMap(AsyncN1qlQueryResult::rows)
.flatMap(row ->
bucket.async().
get(row.value().getString("id")))
.map(JsonDocument::content).
toList()
.toBlocking()
.single();
当我通过查询时,此代码运行良好
"SELECT meta().id as id FROM bucket"
但是当我使用类似
的东西时"SELECT meta().id as id FROM bucket order by id ASC"
我得到的结果没有排序。但是,当我 运行 查询控制台上的相同查询结果符合预期时。这让我相信我在 rxJava 中做错了什么。请帮我解决这个问题。
由于 flatMap()
运算符,顺序丢失,它应用并发流,但不维护顺序。
当您应用 flatMap()
时,您正在为每个 onNext()
创建并订阅一个新的 Observable
,这意味着对于每一行,您正在并行执行此行:
bucket.async().
get(row.value().getString("id")))
然后每个获取操作将在不同的时间完成,并且获取的内容将无序地发出。
如果你想保持顺序但又不想失去并行性,你应该使用 concatMap(),它将只保持 1 个活动流,并按顺序订阅每个提取操作。
如果你need/want并行,你应该使用concatMapEager(),这将并行执行每个创建的 Observable,但会按顺序发出项目。