从可能产生错误情况的 Akka 流准备正确的 HTTP 响应
Preparing proper HTTP Response from an Akka-stream that can produce error situations
我打算使用 Akka Streams 模拟一个简单的游戏 (HTTPReq/HTTPResp)。在一轮中,玩家被服务器挑战猜测一个数字。服务器检查玩家的反应,如果服务器持有的内容与玩家猜测的内容相同,则给玩家一分。
一个典型的流程是这样的:
- 玩家(已通过身份验证,已分配sessionID)请求开始一轮
- 服务器检查sessionID是否有效;如果不是,玩家会收到一条合适的消息
- 服务器生成一个数字并提供给玩家,连同一个 RoundID
...等等。没什么特别的。
这是类型和流程的粗略排列:
import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait GuessingGameMessageToAndFro
case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro
case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro
case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro
object SessionService {
def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}
object NumberGenerator {
def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
m match {
case StartARound(s) => RoundStarted(s, new Random().nextInt())
case MissingSession(s) => m
case _ => throw new RuntimeException("Not yet implemented")
}
}
}
val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
= Flow.fromFunction(m => SessionService.exists(m))
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
= Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))
val s1 = StartARound("123")
val k =
Source
.single(s1)
.via(sessionExistenceChecker)
.via(guessNumberPreparator)
.toMat(Sink.head)(Keep.right)
val finallyObtained = k.run
finallyObtained.onComplete(v => {
v match {
case Success(x) => // Prepare proper HTTP Response
case Failure(ex) => // Prepare proper HTTP Response
}
})
我在 numberToOfferToPlayer() 中经历一个长模式匹配块的原因(我在这里显示了 2,但显然它的大小会随着每种可以流动的类型而增加)是因为如果像 sessionExistenceChecker 这样的运算符生成一个 MissingSession (这是一个错误条件),它必须遍历流的其余部分,不变 直到它到达 Future[Done] 阶段。事实上,问题更普遍:在任何阶段,适当的转换应该导致可接受的类型或错误类型(互斥)。如果我采用这种方法,模式匹配块将以不必要的重复为代价激增,如果不是丑陋的话。
我对我的这个解决方案感到不舒服。它变得冗长和笨拙。
不用说,我这里没有展示Akka-HTTP面向的部分(包括Routes)。上面的代码可以很容易地与路由处理程序拼接在一起。所以,我跳过了。
我的问题是:这种流的正确用法是什么?从概念上讲,如果一切正常,元素应该继续沿着流移动。但是,无论何时发生错误, (error) 元素都应该直接跳到最后阶段,跳过中间的所有其他阶段。可接受的建模方式是什么?
我浏览了很多 Whosebug 帖子,它们表明对于类似情况,应该采用 partition/merge 方式。我了解如何采用这种方法,但对于像我这样的简单案例,这似乎是不必要的工作。或者,我在这里完全偏离了目标吗?
任何关于指关节的提示、片段或说唱,将不胜感激。
使用偏函数
对于这个特定的用例,我通常同意分区和合并设置是 "unnecessary work"。问题中提到的其他堆栈帖子适用于只有 Flow
值可以组合而无法在 Flow 中操作底层逻辑的用例。
如果您能够修改底层逻辑,则存在更简单的解决方案。但解决方案并不严格属于 akka 的领域。相反,您可以利用 scala 本身提供的函数式编程结构。
如果将 numberTofferToPlayer
函数重写为 PartialFunction
:
object NumberGenerator {
val numberToOfferToPlayer : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case s : StartARound => RoundStarted(s.sessionID, new Random().nextInt())
}
}
然后这个 PartialFunction
可以被提升到一个常规函数中,如果消息是 StartARound
类型,它将应用逻辑,如果是任何其他类型,则只转发消息。
此提升是通过 scala 中的 applyOrElse
method of PartialFunction in conjunction with the predefined identity
函数完成的,其中 returns 输入作为输出(即 "forwards" 输入):
import NumberGenerator.numberToOfferToPlayer
val messageForwarder : GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
identity[GuessingGameMessageToAndFro]
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_] =
Flow fromFunction (numberToOfferToPlayer applyOrElse (_, messageForwarder))
更高层次的抽象
如果您有多个这样的 PartialFunction,您希望将转发逻辑添加到:
val foo : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case r : RoundStarted => ???
}
val bar : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case n : NumberGuessed => ???
}
然后你可以编写一个通用的 lifter 来抽象掉常规的函数创建:
val applyOrForward : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] => GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
((_ : PartialFunction[Int, Int]) applyOrElse ((_ : GuessingGameMessageToAndFro), messageForwader).curried
这个提升器会很好地清理你的代码:
val offerFlow = Flow fromFunction applyOrForward(numberToOfferToPlayer)
val fooFlow = Flow fromFunction applyOrForward(foo)
val barFlow = Flow fromFunction applyOrForward(bar)
然后可以按照问题描述的方式组合这些流程:
val combinedFlow = offerFlow via fooFlow via barFlow
同样,您可以通过先组合 PartialFunctions 然后从组合中创建单个 Flow 来获得相同的结果。这对于 akka 之外的单元测试很有用:
val combinedPartial = numberToOfferToPlayer orElse foo orElse bar
//no akka test kit necessary
assert {
val testError = MissingSession("testId")
applyOrForward(combinedPartial)(testError) equals testError
}
//nothing much to test
val otherCombinedFlow = Flow fromFunction applyOrForward(combinedPartial)
我打算使用 Akka Streams 模拟一个简单的游戏 (HTTPReq/HTTPResp)。在一轮中,玩家被服务器挑战猜测一个数字。服务器检查玩家的反应,如果服务器持有的内容与玩家猜测的内容相同,则给玩家一分。
一个典型的流程是这样的:
- 玩家(已通过身份验证,已分配sessionID)请求开始一轮
- 服务器检查sessionID是否有效;如果不是,玩家会收到一条合适的消息
- 服务器生成一个数字并提供给玩家,连同一个 RoundID
...等等。没什么特别的。
这是类型和流程的粗略排列:
import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait GuessingGameMessageToAndFro
case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro
case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro
case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro
object SessionService {
def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}
object NumberGenerator {
def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
m match {
case StartARound(s) => RoundStarted(s, new Random().nextInt())
case MissingSession(s) => m
case _ => throw new RuntimeException("Not yet implemented")
}
}
}
val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
= Flow.fromFunction(m => SessionService.exists(m))
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
= Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))
val s1 = StartARound("123")
val k =
Source
.single(s1)
.via(sessionExistenceChecker)
.via(guessNumberPreparator)
.toMat(Sink.head)(Keep.right)
val finallyObtained = k.run
finallyObtained.onComplete(v => {
v match {
case Success(x) => // Prepare proper HTTP Response
case Failure(ex) => // Prepare proper HTTP Response
}
})
我在 numberToOfferToPlayer() 中经历一个长模式匹配块的原因(我在这里显示了 2,但显然它的大小会随着每种可以流动的类型而增加)是因为如果像 sessionExistenceChecker 这样的运算符生成一个 MissingSession (这是一个错误条件),它必须遍历流的其余部分,不变 直到它到达 Future[Done] 阶段。事实上,问题更普遍:在任何阶段,适当的转换应该导致可接受的类型或错误类型(互斥)。如果我采用这种方法,模式匹配块将以不必要的重复为代价激增,如果不是丑陋的话。
我对我的这个解决方案感到不舒服。它变得冗长和笨拙。
不用说,我这里没有展示Akka-HTTP面向的部分(包括Routes)。上面的代码可以很容易地与路由处理程序拼接在一起。所以,我跳过了。
我的问题是:这种流的正确用法是什么?从概念上讲,如果一切正常,元素应该继续沿着流移动。但是,无论何时发生错误, (error) 元素都应该直接跳到最后阶段,跳过中间的所有其他阶段。可接受的建模方式是什么?
我浏览了很多 Whosebug 帖子,它们表明对于类似情况,应该采用 partition/merge 方式。我了解如何采用这种方法,但对于像我这样的简单案例,这似乎是不必要的工作。或者,我在这里完全偏离了目标吗?
任何关于指关节的提示、片段或说唱,将不胜感激。
使用偏函数
对于这个特定的用例,我通常同意分区和合并设置是 "unnecessary work"。问题中提到的其他堆栈帖子适用于只有 Flow
值可以组合而无法在 Flow 中操作底层逻辑的用例。
如果您能够修改底层逻辑,则存在更简单的解决方案。但解决方案并不严格属于 akka 的领域。相反,您可以利用 scala 本身提供的函数式编程结构。
如果将 numberTofferToPlayer
函数重写为 PartialFunction
:
object NumberGenerator {
val numberToOfferToPlayer : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case s : StartARound => RoundStarted(s.sessionID, new Random().nextInt())
}
}
然后这个 PartialFunction
可以被提升到一个常规函数中,如果消息是 StartARound
类型,它将应用逻辑,如果是任何其他类型,则只转发消息。
此提升是通过 scala 中的 applyOrElse
method of PartialFunction in conjunction with the predefined identity
函数完成的,其中 returns 输入作为输出(即 "forwards" 输入):
import NumberGenerator.numberToOfferToPlayer
val messageForwarder : GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
identity[GuessingGameMessageToAndFro]
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_] =
Flow fromFunction (numberToOfferToPlayer applyOrElse (_, messageForwarder))
更高层次的抽象
如果您有多个这样的 PartialFunction,您希望将转发逻辑添加到:
val foo : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case r : RoundStarted => ???
}
val bar : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
case n : NumberGuessed => ???
}
然后你可以编写一个通用的 lifter 来抽象掉常规的函数创建:
val applyOrForward : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] => GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
((_ : PartialFunction[Int, Int]) applyOrElse ((_ : GuessingGameMessageToAndFro), messageForwader).curried
这个提升器会很好地清理你的代码:
val offerFlow = Flow fromFunction applyOrForward(numberToOfferToPlayer)
val fooFlow = Flow fromFunction applyOrForward(foo)
val barFlow = Flow fromFunction applyOrForward(bar)
然后可以按照问题描述的方式组合这些流程:
val combinedFlow = offerFlow via fooFlow via barFlow
同样,您可以通过先组合 PartialFunctions 然后从组合中创建单个 Flow 来获得相同的结果。这对于 akka 之外的单元测试很有用:
val combinedPartial = numberToOfferToPlayer orElse foo orElse bar
//no akka test kit necessary
assert {
val testError = MissingSession("testId")
applyOrForward(combinedPartial)(testError) equals testError
}
//nothing much to test
val otherCombinedFlow = Flow fromFunction applyOrForward(combinedPartial)