使用 Scala + Monix 异步获取 Cassandra 查询结果
Getting Cassandra query results asynchronously using Scala + Monix
我正在使用 AKKA Http, Monix and Datastax Java Driver for Apache Cassandra 构建 REST API,在尝试从 cassandra 获取一些项目时遇到了一些麻烦,等待查询完成并返回结果。
I'm able to print all the results easily, but unable to wait for the
query to be done and return all the items. My rest point simply
returns an empty array of items since it does not wait for the query
to be done.
我有一个 executeQuery
方法需要:
queryString: String
表示一个 cassandra 查询
page: Int
对分页有用
parameters: Any*
代表参数,如果查询需要
和returns一个Observable[Row]
。
然后,为了执行这样的查询、检索其结果、解析它们并将它们发回,我使用 Monix Observable and Subscription。
假设我想通过一个名为 pid
:
的公共字段检索一些项目
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
其中 rowToItem
只是将一行解析为 Item
和 Items: List[Item]
。
我正在看 Task 但我不太确定它是我要找的东西。
编辑
使用@Alexandru Nedelcu 解决方案,我能够在 itemList
中打印所有 items
,一旦它们被插入其中,但仍然对该调用得到空响应:{ "items" : [] }
.
这是编辑后的代码:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
如何才能等到结果全部解析完插入item后再发回?
据我了解,您有一个 Observable[Row]
,并且您想从中构建一个 Items
,它聚合了源流中的每个 Row
元素,对吗?
如果是这样,foldLeftL
就是您想要的,它将把每个元素聚合成一个状态,return 源流完成后的最终结果:
// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
val initial: Items = _
val observable: Observable[Row] = ???
// This returns a Task[Items] when the source completes
observable.foldLeftL(initial) { (items, elem) =>
items ::= ItemMapper.rowToItem()(row)
// I don't understand if your `Items` is mutable or not
// but returning the same reference is fine
items
}
}
一个Task
是一个懒惰的Future
。您可以将其转换为 Future
和 runAsync
。此处有更多详细信息:https://monix.io/docs/2x/eval/task.html
我正在使用 AKKA Http, Monix and Datastax Java Driver for Apache Cassandra 构建 REST API,在尝试从 cassandra 获取一些项目时遇到了一些麻烦,等待查询完成并返回结果。
I'm able to print all the results easily, but unable to wait for the query to be done and return all the items. My rest point simply returns an empty array of items since it does not wait for the query to be done.
我有一个 executeQuery
方法需要:
queryString: String
表示一个 cassandra 查询page: Int
对分页有用parameters: Any*
代表参数,如果查询需要
和returns一个Observable[Row]
。
然后,为了执行这样的查询、检索其结果、解析它们并将它们发回,我使用 Monix Observable and Subscription。
假设我想通过一个名为 pid
:
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
其中 rowToItem
只是将一行解析为 Item
和 Items: List[Item]
。
我正在看 Task 但我不太确定它是我要找的东西。
编辑
使用@Alexandru Nedelcu 解决方案,我能够在 itemList
中打印所有 items
,一旦它们被插入其中,但仍然对该调用得到空响应:{ "items" : [] }
.
这是编辑后的代码:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
如何才能等到结果全部解析完插入item后再发回?
据我了解,您有一个 Observable[Row]
,并且您想从中构建一个 Items
,它聚合了源流中的每个 Row
元素,对吗?
如果是这样,foldLeftL
就是您想要的,它将把每个元素聚合成一个状态,return 源流完成后的最终结果:
// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
val initial: Items = _
val observable: Observable[Row] = ???
// This returns a Task[Items] when the source completes
observable.foldLeftL(initial) { (items, elem) =>
items ::= ItemMapper.rowToItem()(row)
// I don't understand if your `Items` is mutable or not
// but returning the same reference is fine
items
}
}
一个Task
是一个懒惰的Future
。您可以将其转换为 Future
和 runAsync
。此处有更多详细信息:https://monix.io/docs/2x/eval/task.html