使用 Scala + Monix 异步获取 Cassandra 查询结果

Getting Cassandra query results asynchronously using Scala + Monix

我正在使用 AKKA Http, Monix and Datastax Java Driver for Apache Cassandra 构建 REST API,在尝试从 cassandra 获取一些项目时遇到了一些麻烦,等待查询完成并返回结果。

I'm able to print all the results easily, but unable to wait for the query to be done and return all the items. My rest point simply returns an empty array of items since it does not wait for the query to be done.

我有一个 executeQuery 方法需要:

和returns一个Observable[Row]

然后,为了执行这样的查询、检索其结果、解析它们并将它们发回,我使用 Monix Observable and Subscription

假设我想通过一个名为 pid:

的公共字段检索一些项目
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable

import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}

. . .
val keyspace = "my_keyspace"
val table = "items"
. . .

def getItems() : Items = {
  var itemList: Items = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

其中 rowToItem 只是将一行解析为 ItemItems: List[Item]。 我正在看 Task 但我不太确定它是我要找的东西。

编辑

使用@Alexandru Nedelcu 解决方案,我能够在 itemList 中打印所有 items ,一旦它们被插入其中,但仍然对该调用得到空响应:{ "items" : [] }.

这是编辑后的代码:

def getItems() : Items = {
  var itemList: List[Item] = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    println(itemList)
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

如何才能等到结果全部解析完插入item后再发回?

据我了解,您有一个 Observable[Row],并且您想从中构建一个 Items,它聚合了源流中的每个 Row 元素,对吗?

如果是这样,foldLeftL 就是您想要的,它将把每个元素聚合成一个状态,return 源流完成后的最终结果:

// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
  val initial: Items = _
  val observable: Observable[Row] = ???

  // This returns a Task[Items] when the source completes
  observable.foldLeftL(initial) { (items, elem) =>
    items ::= ItemMapper.rowToItem()(row)
    // I don't understand if your `Items` is mutable or not
    // but returning the same reference is fine
    items
  }
}

一个Task是一个懒惰的Future。您可以将其转换为 FuturerunAsync。此处有更多详细信息:https://monix.io/docs/2x/eval/task.html