如何将 KeyValue 列表发送到 Kafka?

How to send a KeyValue list to Kafka?

我正在尝试在 Kafka Streams 应用程序中向主题发送列表 [KeyValue]。但是流需要一个 KeyValue。如何将键值发送到流,而不是整个列表?

class MainTopology {

  val createTopology = (sourceTopic: String, sinkTopic: String) => {
    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .map[String, String]((key, value) => toFormattedEvents(key, value))
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event))
  }

}

第二张地图因此没有编译。

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

我更新了我的代码,但现在又抛出另一个错误:

    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .flatMap(
      (key, value) => toFormattedEvents(key, value)
    )
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event)).toIterable.asJava
  }
Error:(15, 8) inferred type arguments [?1,?0] do not conform to method flatMap's type parameter bounds [KR,VR]
      .flatMap(
Error:(16, 20) type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[String,Option[org.minsait.streams.model.JsonMessage],Iterable[org.apache.kafka.streams.KeyValue[String,String]]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: Option[org.minsait.streams.model.JsonMessage], _ <: Iterable[_ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]]
      (key, value) => toFormattedEvents(key, value)

看看 flatMap()flatMapValues() 来替换第二个 map()https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html

例如:

.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))

如果您出于某种原因确实打算继续使用 map,请参阅下文。

The second map is not compiling due to this.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

对应代码,供参考:

.map[String, String]((key, value) => toFormattedEvents(key, value))

你需要的是:

.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))

其中 KRVR 分别是键和值类型,如 return 由 toFormattedEvents() 编辑(实际的 return 类型不是从你的问题中清楚)。为此,您还必须为 List[KeyValue[KR, VR]] 类型设置 Serde

让我用几种不同的类型来说明这一点,以便更容易理解方法调用的哪一部分引用了哪种类型。假设我们希望映射输出具有 Integer 的键类型和 List[KeyValue[String, Long]]:

的值类型
.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))

请注意,此示例为每个映射记录分配了相同的值,但这不是示例的重点。