试图理解 Scala enumerator/iteratees
Trying to understand Scala enumerator/iteratees
我是 Scala 和 Play! 的新手,但在使用 Django 和 Python 构建 Web 应用程序以及一般编程方面有一定的经验。
我一直在做自己的练习以尝试提高我的理解力 - 只需从数据库中提取一些记录并将它们输出为 JSON 数组。我正在尝试使用 Enumarator/Iteratee 功能来执行此操作。
我的代码如下:
TestObjectController.scala:
def index = Action {
db.withConnection { conn=>
val stmt = conn.createStatement()
val result = stmt.executeQuery("select * from datatable")
logger.debug(result.toString)
val resultEnum:Enumerator[TestDataObject] = Enumerator.generateM {
logger.debug("called enumerator")
result.next() match {
case true =>
val obj = TestDataObject(result.getString("name"), result.getString("object_type"),
result.getString("quantity").toInt, result.getString("cost").toFloat)
logger.info(obj.toJsonString)
Future(Some(obj))
case false =>
logger.warn("reached end of iteration")
stmt.close()
null
}
}
val consume:Iteratee[TestDataObject,Seq[TestDataObject]] = {
Iteratee.fold[TestDataObject,Seq[TestDataObject]](Seq.empty[TestDataObject]) { (result,chunk) => result :+ chunk }
}
val newIteree = Iteratee.flatten(resultEnum(consume))
val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run
eventuallyResult.onSuccess { case x=> println(x)}
Ok("")
}
}
TestDataObject.scala:
package models
case class TestDataObject (name: String, objtype: String, quantity: Int, cost: Float){
def toJsonString: String = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(this)
}
}
我有两个主要问题:
如何从枚举器回调中发出输入已完成的信号?文档说 "this method takes a callback function e: => Future[Option[E]] that will be called each time the iteratee this Enumerator is applied to is ready to take some input." 但我无法传递我发现的任何类型的 EOF,因为它是错误的类型。将它包装在 Future 中没有帮助,但本能地我不确定这是正确的方法。
如何从控制器视图中获取 Future 到 return 的最终结果?我的理解是,我实际上需要暂停主线程以等待子线程完成,但我见过的唯一示例以及我在未来发现的唯一示例 class 是 onSuccess 回调 - 但是那我怎么才能return那个从视图呢? Iteratee.run 是否会阻塞直到所有输入都被消耗?
还有几个子问题,以帮助我理解:
- 当我的对象已经在 Future 中时,为什么我需要将它包装在 Some() 中? Some() 到底代表什么?
- 当我第一次 运行 代码时,我从 logger.info 得到一条记录,然后它报告 "reached end of iteration"。同一会话中的后续 运行s 什么也不调用。虽然我正在关闭声明,但为什么我第二次没有得到任何结果?我原以为它会无限期地循环,因为我不知道如何发出正确的循环终止信号。
非常感谢您的回答,我以为我已经掌握了窍门,但显然还没有!
How do i signal that the input is complete from the Enumerator callback?
你return一个Future(None)
.
How do I get the final result out of the Future to return from the controller view?
您可以使用 Action.async
(doc):
def index = Action.async {
db.withConnection { conn=>
...
val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run
eventuallyResult map { data =>
OK(...)
}
}
}
Why do I need to wrap my object in Some() when it's already in a Future? What exactly does Some() represent?
Future
表示获取下一个元素的(可能是异步的)处理。 Option
表示下一个元素的可用性:Some(x)
如果另一个元素可用,None
如果枚举完成。
我是 Scala 和 Play! 的新手,但在使用 Django 和 Python 构建 Web 应用程序以及一般编程方面有一定的经验。
我一直在做自己的练习以尝试提高我的理解力 - 只需从数据库中提取一些记录并将它们输出为 JSON 数组。我正在尝试使用 Enumarator/Iteratee 功能来执行此操作。
我的代码如下:
TestObjectController.scala:
def index = Action {
db.withConnection { conn=>
val stmt = conn.createStatement()
val result = stmt.executeQuery("select * from datatable")
logger.debug(result.toString)
val resultEnum:Enumerator[TestDataObject] = Enumerator.generateM {
logger.debug("called enumerator")
result.next() match {
case true =>
val obj = TestDataObject(result.getString("name"), result.getString("object_type"),
result.getString("quantity").toInt, result.getString("cost").toFloat)
logger.info(obj.toJsonString)
Future(Some(obj))
case false =>
logger.warn("reached end of iteration")
stmt.close()
null
}
}
val consume:Iteratee[TestDataObject,Seq[TestDataObject]] = {
Iteratee.fold[TestDataObject,Seq[TestDataObject]](Seq.empty[TestDataObject]) { (result,chunk) => result :+ chunk }
}
val newIteree = Iteratee.flatten(resultEnum(consume))
val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run
eventuallyResult.onSuccess { case x=> println(x)}
Ok("")
}
}
TestDataObject.scala:
package models
case class TestDataObject (name: String, objtype: String, quantity: Int, cost: Float){
def toJsonString: String = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(this)
}
}
我有两个主要问题:
如何从枚举器回调中发出输入已完成的信号?文档说 "this method takes a callback function e: => Future[Option[E]] that will be called each time the iteratee this Enumerator is applied to is ready to take some input." 但我无法传递我发现的任何类型的 EOF,因为它是错误的类型。将它包装在 Future 中没有帮助,但本能地我不确定这是正确的方法。
如何从控制器视图中获取 Future 到 return 的最终结果?我的理解是,我实际上需要暂停主线程以等待子线程完成,但我见过的唯一示例以及我在未来发现的唯一示例 class 是 onSuccess 回调 - 但是那我怎么才能return那个从视图呢? Iteratee.run 是否会阻塞直到所有输入都被消耗?
还有几个子问题,以帮助我理解:
- 当我的对象已经在 Future 中时,为什么我需要将它包装在 Some() 中? Some() 到底代表什么?
- 当我第一次 运行 代码时,我从 logger.info 得到一条记录,然后它报告 "reached end of iteration"。同一会话中的后续 运行s 什么也不调用。虽然我正在关闭声明,但为什么我第二次没有得到任何结果?我原以为它会无限期地循环,因为我不知道如何发出正确的循环终止信号。
非常感谢您的回答,我以为我已经掌握了窍门,但显然还没有!
How do i signal that the input is complete from the Enumerator callback?
你return一个Future(None)
.
How do I get the final result out of the Future to return from the controller view?
您可以使用 Action.async
(doc):
def index = Action.async {
db.withConnection { conn=>
...
val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run
eventuallyResult map { data =>
OK(...)
}
}
}
Why do I need to wrap my object in Some() when it's already in a Future? What exactly does Some() represent?
Future
表示获取下一个元素的(可能是异步的)处理。 Option
表示下一个元素的可用性:Some(x)
如果另一个元素可用,None
如果枚举完成。