使用 TypeTag 克服 Scala 中的类型擦除
Overcoming Type Erasure in Scala with TypeTag
我有以下情况class:
trait Event
object Event {
case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T]) extends Event
}
trait KafkaMessage
object KafkaMessage {
case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
}
case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
}
}
在我的一个 Actor 中,我有以下识别实际类型的方法:
class KafkaPublisher[T <: KafkaMessage: TypeTag] extends Actor {
def paramInfo[T](x: T)(implicit tag: TypeTag[T]): Unit = {
val targs = typeOf[T] match { case TypeRef(_, _, args) => args }
println(s"type of $x has type arguments $targs")
}
implicit val system = context.system
val log = Logging(system, this.getClass.getName)
override final def receive = {
case ProducerStreamActivated(_, stream) =>
paramInfo(stream)
log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
context.become(active(stream))
case other =>
log.warning("KafkaPublisher got some unknown message while producing: " + other)
}
def active(stream: SourceQueueWithComplete[KafkaMessage]): Receive = {
case msg: T =>
stream.offer(msg)
case other =>
log.warning("KafkaPublisher got the unknown message while producing: " + other)
}
}
object KafkaPublisher {
def props[T <: KafkaMessage: TypeTag] =
Props(new KafkaPublisher[T])
}
我在这样的父 Actor 中创建 ProducerStreamActivated(...) 实例:
val stream = producerStream[DefaultMessage](producerProperties)
def producerStream[T: Converter](producerProperties: Map[String, String]): SourceQueueWithComplete[T] = {
if (Try(producerProperties("isEnabled").toBoolean).getOrElse(false)) {
log.info(s"Kafka is enabled for topic ${producerProperties("publish-topic")}")
val streamFlow = flowToKafka[T](producerProperties)
val streamSink = sink(producerProperties)
source[T].via(streamFlow).to(streamSink).run()
} else {
// We just Log to the Console and by pass all Kafka communication
log.info(s"Kafka is disabled for topic ${producerProperties("publish-topic")}")
source[T].via(flowToLog[T](log)).to(Sink.ignore).run()
}
}
现在,当我在我的子 actor 中打印包含在流 SourceQueueWithComplete[T] 中的 Type 时,我看到了包含的基础 class KafkaMessage 而不是预期的 DefaultMessage。有什么想法可以缓解这种情况吗?
在你的 KafkaPublisher
的 receive
方法中,你在没有任何类型参数的 ProducerStreamActivated
上进行模式匹配(由于类型擦除,你不能匹配有参数的) ,并且在该方法中,传递给 paramInfo
的隐式 TypeTag
是在编译时决定的,此时它只是一个 TypeTag[KafkaMessage]
.
您应该能够解决此问题的一种方法是让 ProducerStreamActivated
class 带有自己的类型标签,即:
case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T])(implicit val tag: TypeTag[T]) extends Event
然后不要在 receive
方法中隐式调用一个,只需执行 msg.tag
.
这应该可行,因为在您实际创建 ProducerStreamActivated
时,您确实拥有编译时类型参数信息(它是 DefaultMessage
),因此这将是编译器的类型标签填写,然后您可以保留对该内容的引用。
我有以下情况class:
trait Event
object Event {
case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T]) extends Event
}
trait KafkaMessage
object KafkaMessage {
case class DefaultMessage(message: String, timestamp: DateTime) extends KafkaMessage {
def this() = this("DEFAULT-EMPTY-MESSAGE", DateTime.now(DateTimeZone.UTC))
}
case class DefaultMessageBundle(messages: Seq[DefaultMessage], timeStamp: DateTime) extends KafkaMessage {
def this() = this(Seq.empty, DateTime.now(DateTimeZone.UTC))
}
}
在我的一个 Actor 中,我有以下识别实际类型的方法:
class KafkaPublisher[T <: KafkaMessage: TypeTag] extends Actor {
def paramInfo[T](x: T)(implicit tag: TypeTag[T]): Unit = {
val targs = typeOf[T] match { case TypeRef(_, _, args) => args }
println(s"type of $x has type arguments $targs")
}
implicit val system = context.system
val log = Logging(system, this.getClass.getName)
override final def receive = {
case ProducerStreamActivated(_, stream) =>
paramInfo(stream)
log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} << ActorPath >> ${self.path} <<")
context.become(active(stream))
case other =>
log.warning("KafkaPublisher got some unknown message while producing: " + other)
}
def active(stream: SourceQueueWithComplete[KafkaMessage]): Receive = {
case msg: T =>
stream.offer(msg)
case other =>
log.warning("KafkaPublisher got the unknown message while producing: " + other)
}
}
object KafkaPublisher {
def props[T <: KafkaMessage: TypeTag] =
Props(new KafkaPublisher[T])
}
我在这样的父 Actor 中创建 ProducerStreamActivated(...) 实例:
val stream = producerStream[DefaultMessage](producerProperties)
def producerStream[T: Converter](producerProperties: Map[String, String]): SourceQueueWithComplete[T] = {
if (Try(producerProperties("isEnabled").toBoolean).getOrElse(false)) {
log.info(s"Kafka is enabled for topic ${producerProperties("publish-topic")}")
val streamFlow = flowToKafka[T](producerProperties)
val streamSink = sink(producerProperties)
source[T].via(streamFlow).to(streamSink).run()
} else {
// We just Log to the Console and by pass all Kafka communication
log.info(s"Kafka is disabled for topic ${producerProperties("publish-topic")}")
source[T].via(flowToLog[T](log)).to(Sink.ignore).run()
}
}
现在,当我在我的子 actor 中打印包含在流 SourceQueueWithComplete[T] 中的 Type 时,我看到了包含的基础 class KafkaMessage 而不是预期的 DefaultMessage。有什么想法可以缓解这种情况吗?
在你的 KafkaPublisher
的 receive
方法中,你在没有任何类型参数的 ProducerStreamActivated
上进行模式匹配(由于类型擦除,你不能匹配有参数的) ,并且在该方法中,传递给 paramInfo
的隐式 TypeTag
是在编译时决定的,此时它只是一个 TypeTag[KafkaMessage]
.
您应该能够解决此问题的一种方法是让 ProducerStreamActivated
class 带有自己的类型标签,即:
case class ProducerStreamActivated[T <: KafkaMessage](kafkaTopic: String, stream: SourceQueueWithComplete[T])(implicit val tag: TypeTag[T]) extends Event
然后不要在 receive
方法中隐式调用一个,只需执行 msg.tag
.
这应该可行,因为在您实际创建 ProducerStreamActivated
时,您确实拥有编译时类型参数信息(它是 DefaultMessage
),因此这将是编译器的类型标签填写,然后您可以保留对该内容的引用。