具有 OR 和一种通用位置方法的几种类型

Several types with OR with one generic position method

我有两种类型的数据:String 和 SpecificRecordBase 的扩展以及具有相同业务逻辑的几种方法来处理这些数据,除了使用我无法更改的另一个库中的 java 方法。

  def createResultStreamSpecificRecordBase[T <: SpecificRecordBase](topic: String, source: DataStream[KafkaSourceType])(
      implicit tag: ClassTag[T]): DataStream[T] = {
      // contain calling java method with T <: SpecificRecordBase
      serialize(source, topic)  
      // the same logic
  }

  def createResultStreamString(topic: String, source: DataStream[KafkaSourceType]): DataStream[String] = {
      // dont contain calling java method with T <: SpecificRecordBase
      // the same logic
  }

  def processSpecificRecordBase[T <: SpecificRecordBase](...)(implicit tag: ClassTag[T]): Unit = {
      val stream = createStreamSpecificRecordBase(topic, source)
      // the other the same process actions
  }

  def processString(...): Unit = {
      val stream = createResultStreamString(topic, source)
      // the other the same process actions
  }

我想去掉process方法中的重复代码,写成single。你能写下怎么做吗?是否可以在泛型类型中使用类似 OR 的东西?

  def process[T <: SpecificRecordBase ?OR? String](...)(implicit tag: ClassTag[T]): Unit = {
      val stream = createStream(topic, source)

      this match {
          case SpecificRecordBaseRunner() => createStreamSpecificRecordBase(topic, source)
          case StringRunner()             => createStreamString(topic, source)
      }
      // the other process actions
  }

不幸的是,目前没有您描述的 OR 类型运算符。好消息是,Union Types 功能是即将推出的 Dotty 的一部分: https://dotty.epfl.ch/docs/reference/new-types/union-types.html

目前此类问题通常通过以下方法解决:

求和类型

您可以声明 sealed trait 这将包含所有可能的实例。 请查看下一个代码示例以获取更多详细信息:

// This is your base of sum type
sealed trait TopicResult

//Declared instances which you need to handle
case class RecordResult[T <: SpecificRecordBase](tag: ClassTag[T]) extends TopicResult
case object StringResult extends TopicResult

//Helper methods goes here for convenience 
object TopicResult {
    def record[T <: SpecificRecordBase](implicit tag: ClassTag[T]): TopicResult = RecordResult(tag)
    def string: TopicResult = StringResult
  }

def process(result: TopicResult /*other arguments*/): Unit = {
  // This pattern matching should be safe, because you know all instances in advance.
  result match {
    case RecordResult(tag) => createStreamSpecificRecordBase(topic, source)
    case StringResult => createStreamString(topic, source)
  }
  /// other operations
}

临时多态

也称为Type Class 模式。您可以根据其类型提取行为。这是一种非常流行的方法,您可能会在 catscirce 等库中看到它。 请参阅下面的代码示例以获取更多详细信息:

// Type dependent logic interface
trait CreateTopic[T] {
 def createTopic(topic: String, source: String): String // put desired result type here
}

object CreateTopic {
  implicit def recordCreateTopic[T <: SpecificRecordBase](implicit tag: ClassTag[T]): CreateTopic[T] = {
    (topic: String, source: String) => ??? // create topic for record base
  }

  implicit val stringCreateTopic: CreateTopic[String] = {
    (topic: String, source: String) => ??? // create topic for record base
  }
}

import CreateTopic._
// implementation will be substituted by compiler based on it's type, from implicit context 
def process[T](/*other arguments*/)(implicit create: CreateTopic[T]): Unit = {
  create.createTopic(topic, source)
  /// other operations
}

对不起,我不是 Kafka 专家,所以如果您发现某些部分从 Kafka 的角度来看没有意义,请指正。

希望对您有所帮助!