如何通过子类型 B<:A 过滤 Stream[Future[A]]-like class?

How to filter a Stream[Future[A]]-like class by sub-type B<:A?

我有这个 class 管理异步消息流的接收。它有类型参数,比如 A,我喜欢实现一个按类型过滤这些消息的函数,比如 B<:A

但由于类型擦除,我的第一个实现无法正常工作(请参见下面的示例)。有什么好的方法吗?

这是我的问题的一个简化示例:

package test.filterByType

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.Future

trait MessageStream[A]{ self =>

  def next(): Future[A]

  def filter[B<:A]: MessageStream[B] = new MessageStream[B]{
    def next(): Future[B] = self.next.flatMap{
      case b: B => Future.successful(b)
      case _    => next()
      }
  }
}

case class MessageStreamImpl[A](msg: IndexedSeq[A]) extends MessageStream[A]{
  private var index = 0
  def next() = {
    index += 1
    Future.successful(msg(index-1))
  }
}

object Main{
  trait A
  case class B(i: Int) extends A
  case class C(i: Int) extends A

  def main(args: Array[String]){
    val msg: IndexedSeq[A] = (1 to 10).map{ i => if(i%2==0) B(i) else C(i) }
    val streamOfA = MessageStreamImpl(msg)
    val streamOfB = streamOfA.filter[B]

    val b: B = Await.result(streamOfB.next(), 1 second)
  }
}

在编译时,我得到 warning: abstract type pattern B is unchecked since it is eliminated by erasure,确实代码不起作用。如果我执行 Main,我会得到错误: lang.ClassCastException: test.filterByType.Main$C cannot be cast to test.filterByType.Main$B

这是因为它没有过滤掉第一个列表项 C(1)

这个小调整就可以了

import scala.reflect.ClassTag
def filter[B<:A](implicit C: ClassTag[B]): MessageStream[B] = new MessageStream[B]{
    def next(): Future[B] = self.next.flatMap{
        case b: B => Future.successful(b)
        case _    => next()
    }
}

我找到了一个可行的替代方案:

def collect[B<:A](f: PartialFunction[A,B]) = new MessageStream[B] {
  override def next(): Future[B] = self.next().flatMap { m =>
    if (f.isDefinedAt(m)) Future.successful(f(m))
    else next()
  }
}