Supervision.Resume 实施时的 Akka Stream 测试流程
Akka Stream test flow when Supervision.Resume implemented
我最近实现了一个 akka-stream 流,它解析一些 json 消息,验证给定密钥 (destination_region
) 的存在并传递给下一阶段一个案例 class包含原始消息和 destination_region
字符串。
我实现了一个自定义决策程序,以便在遇到任何解析或密钥错误时,它会在记录异常后触发 Supervision.Resume
。
一个简约的实现看起来像:
package com.example.stages
import com.example.helpers.EitherHelpers._
/*
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object EitherHelpers {
implicit class ErrorEither[L <: Throwable, R](val either: Either[L, R]) extends AnyVal {
def asFuture: Future[R] = either.fold(Future.failed, Future.successful)
def asTry: Try[R] = either.fold(Failure.apply, Success.apply)
}
}
*/
import scala.concurrent.ExecutionContext
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision
import software.amazon.awssdk.services.sqs.model.Message
import io.circe.parser.parse
import io.circe.{DecodingFailure, ParsingFailure}
object MessageContentValidationFlow {
def apply()(
implicit executionContext: ExecutionContext): Flow[Message, MessageWithRegion, NotUsed] = {
val customDecider: Supervision.Decider = {
case e @ (_: DecodingFailure | _: ParsingFailure) => {
println(e)
Supervision.Resume
}
case _ => Supervision.Stop
}
Flow[Message]
.mapAsync[MessageWithRegion](2) { message =>
println(s"Received message: $message")
val messageWithRegion = for {
parsed <- parse(message.body()).asFuture
region <- parsed.hcursor.downField("destination_region").as[String].asFuture
} yield { MessageWithRegion(message, region) }
messageWithRegion
}
.withAttributes(supervisionStrategy(customDecider))
}
}
case class MessageWithRegion(message: Message, region: String)
我设法测试了消息有效的情况,但是我不知道如何在 ParsingFailure
或 DecodingFailure
的情况下测试流程。我在下面的实现中尝试了几乎所有可用于 sub
的方法:
package com.example.stages
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import io.circe.generic.JsonCodec, io.circe.syntax._
import io.circe.generic.auto._
import software.amazon.awssdk.services.sqs.model.Message
import org.scalatest.FlatSpec
@JsonCodec case class MessageBody(destination_region: String)
class MessageContentValidationFlowSpecs extends FlatSpec {
implicit val system = ActorSystem("MessageContentValidationFlow")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (pub, sub) = TestSource.probe[Message]
.via(MessageContentValidationFlow())
.toMat(TestSink.probe[MessageWithRegion])(Keep.both)
.run()
"MessageContentValidationFlow" should "process valid messages" in {
val validRegion = "eu-west-1"
val msgBody = MessageBody(validRegion).asJson.toString()
val validMessage = Message.builder().body(msgBody).build()
sub.request(n = 1)
pub.sendNext(validMessage)
val expectedMessageWithRegion = MessageWithRegion(
message = validMessage,
region = validRegion
)
assert(sub.requestNext() == expectedMessageWithRegion)
}
ignore should "trigger Supervision.Resume with empty messages" in {
val emptyMessage = Message.builder().body("").build()
assert(emptyMessage.body() == "")
sub.request(n = 1)
pub.sendNext(emptyMessage)
sub.expectComplete()
}
}
有谁知道如何测试 Supervision.Resume
被触发以及自定义决策程序捕获了哪个异常?
由于 Supervision.Resume
丢弃错误元素并继续处理流,测试监督策略的一种方法是 运行 包含 "good" 和 [=19 的混合流=] 元素并确认物化值是否仅由 "good" 元素组成。例如:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import org.scalatest._
import scala.concurrent._
import scala.concurrent.duration._
class MyTest extends FlatSpec with Matchers {
implicit val system = ActorSystem("MyTest")
implicit val materializer = ActorMaterializer()
val resumingFlow = Flow[Int].map {
case 2 => throw new RuntimeException("bad number")
case i => i
}.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
"resumingFlow" should "drop the number 2" in {
val result: collection.immutable.Seq[Int] =
Await.result(Source((1 to 5).toSeq).via(resumingFlow).runWith(Sink.seq), 5.seconds)
result should be (List(1, 3, 4, 5))
}
}
在您的情况下,创建一个包含有效 Message
对象和至少一个无效 Message
对象的流。
我最近实现了一个 akka-stream 流,它解析一些 json 消息,验证给定密钥 (destination_region
) 的存在并传递给下一阶段一个案例 class包含原始消息和 destination_region
字符串。
我实现了一个自定义决策程序,以便在遇到任何解析或密钥错误时,它会在记录异常后触发 Supervision.Resume
。
一个简约的实现看起来像:
package com.example.stages
import com.example.helpers.EitherHelpers._
/*
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object EitherHelpers {
implicit class ErrorEither[L <: Throwable, R](val either: Either[L, R]) extends AnyVal {
def asFuture: Future[R] = either.fold(Future.failed, Future.successful)
def asTry: Try[R] = either.fold(Failure.apply, Success.apply)
}
}
*/
import scala.concurrent.ExecutionContext
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision
import software.amazon.awssdk.services.sqs.model.Message
import io.circe.parser.parse
import io.circe.{DecodingFailure, ParsingFailure}
object MessageContentValidationFlow {
def apply()(
implicit executionContext: ExecutionContext): Flow[Message, MessageWithRegion, NotUsed] = {
val customDecider: Supervision.Decider = {
case e @ (_: DecodingFailure | _: ParsingFailure) => {
println(e)
Supervision.Resume
}
case _ => Supervision.Stop
}
Flow[Message]
.mapAsync[MessageWithRegion](2) { message =>
println(s"Received message: $message")
val messageWithRegion = for {
parsed <- parse(message.body()).asFuture
region <- parsed.hcursor.downField("destination_region").as[String].asFuture
} yield { MessageWithRegion(message, region) }
messageWithRegion
}
.withAttributes(supervisionStrategy(customDecider))
}
}
case class MessageWithRegion(message: Message, region: String)
我设法测试了消息有效的情况,但是我不知道如何在 ParsingFailure
或 DecodingFailure
的情况下测试流程。我在下面的实现中尝试了几乎所有可用于 sub
的方法:
package com.example.stages
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import io.circe.generic.JsonCodec, io.circe.syntax._
import io.circe.generic.auto._
import software.amazon.awssdk.services.sqs.model.Message
import org.scalatest.FlatSpec
@JsonCodec case class MessageBody(destination_region: String)
class MessageContentValidationFlowSpecs extends FlatSpec {
implicit val system = ActorSystem("MessageContentValidationFlow")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (pub, sub) = TestSource.probe[Message]
.via(MessageContentValidationFlow())
.toMat(TestSink.probe[MessageWithRegion])(Keep.both)
.run()
"MessageContentValidationFlow" should "process valid messages" in {
val validRegion = "eu-west-1"
val msgBody = MessageBody(validRegion).asJson.toString()
val validMessage = Message.builder().body(msgBody).build()
sub.request(n = 1)
pub.sendNext(validMessage)
val expectedMessageWithRegion = MessageWithRegion(
message = validMessage,
region = validRegion
)
assert(sub.requestNext() == expectedMessageWithRegion)
}
ignore should "trigger Supervision.Resume with empty messages" in {
val emptyMessage = Message.builder().body("").build()
assert(emptyMessage.body() == "")
sub.request(n = 1)
pub.sendNext(emptyMessage)
sub.expectComplete()
}
}
有谁知道如何测试 Supervision.Resume
被触发以及自定义决策程序捕获了哪个异常?
由于 Supervision.Resume
丢弃错误元素并继续处理流,测试监督策略的一种方法是 运行 包含 "good" 和 [=19 的混合流=] 元素并确认物化值是否仅由 "good" 元素组成。例如:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import org.scalatest._
import scala.concurrent._
import scala.concurrent.duration._
class MyTest extends FlatSpec with Matchers {
implicit val system = ActorSystem("MyTest")
implicit val materializer = ActorMaterializer()
val resumingFlow = Flow[Int].map {
case 2 => throw new RuntimeException("bad number")
case i => i
}.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
"resumingFlow" should "drop the number 2" in {
val result: collection.immutable.Seq[Int] =
Await.result(Source((1 to 5).toSeq).via(resumingFlow).runWith(Sink.seq), 5.seconds)
result should be (List(1, 3, 4, 5))
}
}
在您的情况下,创建一个包含有效 Message
对象和至少一个无效 Message
对象的流。