对 akka 流的子类型进行分区

Partitioning on SubType for akka streams

我有一个 akka 流,其中有一个 ADT 形式。

sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message

现在我有一个 This Message Handler Flow 和 That Message Handler Flow。我有一个接受消息类型的入口流。

为了创建拆分,我有以下分区程序。我对分区程序函数有以下定义。

 /**
  * Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
  *
  * @tparam A type of input
  * @tparam B type of output on the first outlet.
  * @tparam C type of output on the second outlet.
  *
  * @return A partition stage
  */
  def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
  import GraphDSL.Implicits._

  // This is wrong, but I have no idea how to write this.
  val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
    case _: B => 0
    case _: C => 1
  }))

  new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}

我想用上面的方法,在type params中使用ADT来初始化一个partitioner。

编译器抛出这个错误。

Error:(63, 7) type mismatch;
 found   : akka.stream.FanOutShape2[A,A,A]
 required: akka.stream.FanOutShape2[A,B,C]
      new FanOutShape2(partitioner.in, partitioner.out(0).outlet, 
partitioner.out(1).outlet)

据我了解,分区对象只有入口(在本例中为 A,参数化类型。

有人知道我该如何解决这个问题吗?

问题是您正试图颠覆类型系统。 UniformFanOutShape 被命名为 "uniform" 因为它的所有输出都是相同的类型。如果不是这样,您一开始就不需要创建额外的 FanOutShape2。如果你要颠覆类型系统,你应该始终如一地做,所以你应该改变 Outlet 的类型。尝试这样的事情:

new FanOutShape2(partitioner.in, partitioner.out(0).outlet.as[B], partitioner.out(1).outlet.as[C])

这是从 builder.add(Partition[A]()) 生成的 UniformFanOutShape[A, A] 实例化 FanOutShape2[A, B<:A, C<:A] 的一种方法:

import akka.stream.scaladsl._
import akka.stream.{Graph, FanOutShape2}
import akka.NotUsed
import scala.reflect.ClassTag

def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
  GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
    import GraphDSL.Implicits._

    val partitioner = builder.add(Partition[A](2, {
      case _: B => 0
      case _: C => 1
    }))

    val partitionB = builder.add(Flow[A].collect{ case b: B => b })
    val partitionC = builder.add(Flow[A].collect{ case c: C => c })

    partitioner.out(0) ~> partitionB
    partitioner.out(1) ~> partitionC

    new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
}

// binaryPartitionByType: [A, B <: A, C <: A]()(
//   implicit evidence: scala.reflect.ClassTag[B], implicit evidence: scala.reflect.ClassTag[C]
// ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]

请注意,需要 ClassTag 以避免类型擦除。