Kafka Streams Windowed Key to Human Readable

Kafka Streams Windowed Key to Human Readable

我正在对 kafka 流进行 window 聚合。 它工作正常并且可以正确聚合。 这是scala中的代码。 CallRecord 是一个案例 class.

    builder
  .stream[String, String](input_topic)
  .mapValues((elm) => {
    parse(elm).extract[CallRecord]
  })
  .groupBy((key, value) => {
    value.agentId
  })
  .windowedBy(every15Minute)
  .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
    CallRecordAggByAgent(
      callRecord.agentId,
      ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
      ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
      ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
      aggregator.count + 1
    )
  })
  .mapValues((elm) => {
    write(elm)
  })
  .toStream
  .to(output_topic)

在输出主题中,我看到了类似这样的关键内容。

当我尝试从 KSQLDB 中读取此内容时,当我创建关于此主题的流时,我看到了这样的 rowkey 值 3w�H�@ 我知道这是反序列化问题,但我希望能够直接在 KSQL 中反序列化它,或者在流式传输到 output_topic 时将其设为毫秒长。 我的理解是这应该很容易实现,但我想我在这里遗漏了一些细微差别。

我给出的解决方案如下。显然不是很难。

import io.circe.generic.auto._
import org.json4s._
import org.json4s.native.Serialization.write

builder
      .stream[String, String](args.INPUT_TOPIC)
      .mapValues((elm) => {
        parse(elm).extract[CallRecord]
      })
      .groupBy((key, value) => {
        value.agentId
      })
      .windowedBy(every15Minute)
      .aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
        CallRecordAggByAgent(
          callRecord.agentId,
          ((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
          ((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
          ((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
          aggregator.count + 1
        )
      })
      .mapValues((elm) => {
        write(elm)
      })
      .toStream
      .selectKey((k, v) => {
        s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
      })
      .to(args.OUTPUT_TOPIC)

selectKey 提供了更改分组键的可能性, 所以在流式传输到输出主题之前,我从密钥中提取时间戳并将其作为字符串。