使用 hasNext() 和 next() 遍历异步生成的元素流
Iterate with hasNext() and next() over an asynchronously generated stream of elements
我必须实现一个 Iterator 接口(由 Java API 定义),使用 hasNext() 和 next() 方法,应该 return 结果元素源自来自异步处理的 HTTP 响应(使用 Akka actor 处理)。
必须满足以下要求:
- 不要阻塞并等待异步操作完成,因为大型结果集的生成可能需要一段时间(迭代器应该 return 结果元素一旦可用)
- Iterator.next() 应该阻塞直到下一个元素可用(或者如果没有更多元素出现则抛出异常)
- Iterator.hasNext() 应该 return true 只要还有更多元素(即使下一个元素尚不可用)
- 结果总数事先未知。结果生成 actor 将在完成时发送特定的 "end message"。
- 尽量避免使用 InterruptedException,例如当迭代器正在等待一个空队列但不会生成更多元素时。
我还没有研究过 Java 8 个流或 Akka 流。但是由于我基本上必须遍历队列(有限流),所以我怀疑是否有任何合适的解决方案。
目前,我的 Scala 实现存根使用 java.util.concurrent.BlockingQueue 并且看起来像这样:
class ResultStreamIterator extends Iterator[Result] {
val resultQueue = new ArrayBlockingQueue[Option[Result]](100)
def hasNext(): Boolean = ??? // return true if not done yet
def next(): Result = ??? // take() next element if not done yet
case class Result(value: Any) // sent by result producing actor
case object Done // sent by result producing actor when finished
class ResultCollector extends Actor {
def receive = {
case Result(value) => resultQueue.put(Some(value))
case Done => resultQueue.put(None)
}
}
}
我使用 Option[Result] 来指示带有 None 的结果流的结尾。我尝试过查看下一个元素并使用 'done' 标志,但我希望有一个更简单的解决方案。
奖金问题:
- 单元测试如何覆盖 sync/async 实现,尤其是测试延迟结果生成?
- 如何使迭代器成为线程安全的?
以下代码可以满足要求。
Actor 的字段可以在 Actor 的接收器中安全地修改。
所以resultQueue不应该在Iterator的领域,而应该在Actor的领域。
// ResultCollector should be initialized.
// Initilize code is like...
// resultCollector ! Initialize(100)
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {
implicit val timeout: Timeout = ???
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case ResponseHasNext(hasNext) => hasNext
}
@scala.annotation.tailrec
final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
case ResponseResult(result) => result
case Finished => throw new NoSuchElementException("There is not result.")
case WaitingResult => next()// should be wait for a moment.
}
}
case object RequestResult
case object HasNext
case class ResponseResult(result: Result)
case class ResponseHasNext(hasNext: Boolean)
case object Finished
case object WaitingResult
case class Initialize(expects: Int)
// This code may be more ellegant if using Actor FSM
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
class ResultCollector extends Actor with Stash {
val results = scala.collection.mutable.Queue.empty[Result]
var expects = 0
var counts = 0
var isAllCollected = false
def beforeInitialized: Actor.Receive = {
case Initialize(n) =>
expects = n
if (expects != 0) context become collecting
else context become allCollected
unstashAll
case _ => stash()
}
def collecting: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! WaitingResult
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(true)
case result: Result =>
results += result
counts += 1
isAllCollected = counts >= expects
if (isAllCollected) context become allCollected
}
def allCollected: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! Finished
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(!results.isEmpty)
}
def receive = beforeInitialized
}
您可以使用变量存储下一个元素,并在两种方法开始时等待它:
private var nextNext: Option[Result] = null
def hasNext(): Boolean = {
if (nextNext == null) nextNext = resultQueue.take()
return !nextNext.isEmpty
}
def next(): Result = {
if (nextNext == null) nextNext = resultQueue.take()
if (nextNext.isEmpty) throw new NoSuchElementException()
val result = nextNext.get
nextNext = null
return result
}
我听从了jiro的建议,做了一些必要的改编。总的来说,我喜欢将 getNext()
和 next()
实现为发送给参与者的 ask
消息的方法。这样可以确保任何时候只有一个线程修改队列。
但是,我不确定此实现的性能,因为 ask
和 Await.result
将为 hasNext()
和 next()
的每次调用创建两个线程。
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout
case object HasNext
case object GetNext
case class Result(value: Any)
case object Done
class ResultCollector extends Actor with Stash {
val queue = scala.collection.mutable.Queue.empty[Result]
def collecting: Actor.Receive = {
case HasNext => if (queue.isEmpty) stash else sender ! true
case GetNext => if (queue.isEmpty) stash else sender ! queue.dequeue
case value: Result => unstashAll; queue += value
case Done => unstashAll; context become serving
}
def serving: Actor.Receive = {
case HasNext => sender ! queue.nonEmpty
case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
}
def receive = collecting
}
class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {
implicit val timeout: Timeout = Timeout(30 seconds)
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case b: Boolean => b
}
override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
case Result(value: Any) => value
case e: Throwable => throw e
}
}
object Test extends App {
implicit val exec = scala.concurrent.ExecutionContext.global
val system = ActorSystem.create("Test")
val actorRef = system.actorOf(Props[ResultCollector])
Future {
for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
}
val iterator = new ResultStreamIteration(actorRef)
while (iterator.hasNext()) println(iterator.next)
system.shutdown()
}
我必须实现一个 Iterator 接口(由 Java API 定义),使用 hasNext() 和 next() 方法,应该 return 结果元素源自来自异步处理的 HTTP 响应(使用 Akka actor 处理)。
必须满足以下要求:
- 不要阻塞并等待异步操作完成,因为大型结果集的生成可能需要一段时间(迭代器应该 return 结果元素一旦可用)
- Iterator.next() 应该阻塞直到下一个元素可用(或者如果没有更多元素出现则抛出异常)
- Iterator.hasNext() 应该 return true 只要还有更多元素(即使下一个元素尚不可用)
- 结果总数事先未知。结果生成 actor 将在完成时发送特定的 "end message"。
- 尽量避免使用 InterruptedException,例如当迭代器正在等待一个空队列但不会生成更多元素时。
我还没有研究过 Java 8 个流或 Akka 流。但是由于我基本上必须遍历队列(有限流),所以我怀疑是否有任何合适的解决方案。
目前,我的 Scala 实现存根使用 java.util.concurrent.BlockingQueue 并且看起来像这样:
class ResultStreamIterator extends Iterator[Result] {
val resultQueue = new ArrayBlockingQueue[Option[Result]](100)
def hasNext(): Boolean = ??? // return true if not done yet
def next(): Result = ??? // take() next element if not done yet
case class Result(value: Any) // sent by result producing actor
case object Done // sent by result producing actor when finished
class ResultCollector extends Actor {
def receive = {
case Result(value) => resultQueue.put(Some(value))
case Done => resultQueue.put(None)
}
}
}
我使用 Option[Result] 来指示带有 None 的结果流的结尾。我尝试过查看下一个元素并使用 'done' 标志,但我希望有一个更简单的解决方案。
奖金问题:
- 单元测试如何覆盖 sync/async 实现,尤其是测试延迟结果生成?
- 如何使迭代器成为线程安全的?
以下代码可以满足要求。 Actor 的字段可以在 Actor 的接收器中安全地修改。 所以resultQueue不应该在Iterator的领域,而应该在Actor的领域。
// ResultCollector should be initialized.
// Initilize code is like...
// resultCollector ! Initialize(100)
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {
implicit val timeout: Timeout = ???
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case ResponseHasNext(hasNext) => hasNext
}
@scala.annotation.tailrec
final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
case ResponseResult(result) => result
case Finished => throw new NoSuchElementException("There is not result.")
case WaitingResult => next()// should be wait for a moment.
}
}
case object RequestResult
case object HasNext
case class ResponseResult(result: Result)
case class ResponseHasNext(hasNext: Boolean)
case object Finished
case object WaitingResult
case class Initialize(expects: Int)
// This code may be more ellegant if using Actor FSM
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
class ResultCollector extends Actor with Stash {
val results = scala.collection.mutable.Queue.empty[Result]
var expects = 0
var counts = 0
var isAllCollected = false
def beforeInitialized: Actor.Receive = {
case Initialize(n) =>
expects = n
if (expects != 0) context become collecting
else context become allCollected
unstashAll
case _ => stash()
}
def collecting: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! WaitingResult
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(true)
case result: Result =>
results += result
counts += 1
isAllCollected = counts >= expects
if (isAllCollected) context become allCollected
}
def allCollected: Actor.Receive = {
case RequestResult =>
if (results.isEmpty) sender ! Finished
else sender ! ResponseResult(results.dequeue())
case HasNext => ResponseHasNext(!results.isEmpty)
}
def receive = beforeInitialized
}
您可以使用变量存储下一个元素,并在两种方法开始时等待它:
private var nextNext: Option[Result] = null
def hasNext(): Boolean = {
if (nextNext == null) nextNext = resultQueue.take()
return !nextNext.isEmpty
}
def next(): Result = {
if (nextNext == null) nextNext = resultQueue.take()
if (nextNext.isEmpty) throw new NoSuchElementException()
val result = nextNext.get
nextNext = null
return result
}
我听从了jiro的建议,做了一些必要的改编。总的来说,我喜欢将 getNext()
和 next()
实现为发送给参与者的 ask
消息的方法。这样可以确保任何时候只有一个线程修改队列。
但是,我不确定此实现的性能,因为 ask
和 Await.result
将为 hasNext()
和 next()
的每次调用创建两个线程。
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout
case object HasNext
case object GetNext
case class Result(value: Any)
case object Done
class ResultCollector extends Actor with Stash {
val queue = scala.collection.mutable.Queue.empty[Result]
def collecting: Actor.Receive = {
case HasNext => if (queue.isEmpty) stash else sender ! true
case GetNext => if (queue.isEmpty) stash else sender ! queue.dequeue
case value: Result => unstashAll; queue += value
case Done => unstashAll; context become serving
}
def serving: Actor.Receive = {
case HasNext => sender ! queue.nonEmpty
case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
}
def receive = collecting
}
class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {
implicit val timeout: Timeout = Timeout(30 seconds)
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case b: Boolean => b
}
override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
case Result(value: Any) => value
case e: Throwable => throw e
}
}
object Test extends App {
implicit val exec = scala.concurrent.ExecutionContext.global
val system = ActorSystem.create("Test")
val actorRef = system.actorOf(Props[ResultCollector])
Future {
for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
}
val iterator = new ResultStreamIteration(actorRef)
while (iterator.hasNext()) println(iterator.next)
system.shutdown()
}