Kafka Streams Windowed Key to Human Readable
Kafka Streams Windowed Key to Human Readable
我正在对 kafka 流进行 window 聚合。
它工作正常并且可以正确聚合。
这是scala中的代码。
CallRecord
是一个案例 class.
builder
.stream[String, String](input_topic)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.to(output_topic)
在输出主题中,我看到了类似这样的关键内容。
当我尝试从 KSQLDB 中读取此内容时,当我创建关于此主题的流时,我看到了这样的 rowkey 值 3w�H�@
我知道这是反序列化问题,但我希望能够直接在 KSQL 中反序列化它,或者在流式传输到 output_topic 时将其设为毫秒长。
我的理解是这应该很容易实现,但我想我在这里遗漏了一些细微差别。
我给出的解决方案如下。显然不是很难。
import io.circe.generic.auto._
import org.json4s._
import org.json4s.native.Serialization.write
builder
.stream[String, String](args.INPUT_TOPIC)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.selectKey((k, v) => {
s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
})
.to(args.OUTPUT_TOPIC)
selectKey
提供了更改分组键的可能性,
所以在流式传输到输出主题之前,我从密钥中提取时间戳并将其作为字符串。
我正在对 kafka 流进行 window 聚合。
它工作正常并且可以正确聚合。
这是scala中的代码。
CallRecord
是一个案例 class.
builder
.stream[String, String](input_topic)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.to(output_topic)
在输出主题中,我看到了类似这样的关键内容。
当我尝试从 KSQLDB 中读取此内容时,当我创建关于此主题的流时,我看到了这样的 rowkey 值 3w�H�@
我知道这是反序列化问题,但我希望能够直接在 KSQL 中反序列化它,或者在流式传输到 output_topic 时将其设为毫秒长。
我的理解是这应该很容易实现,但我想我在这里遗漏了一些细微差别。
我给出的解决方案如下。显然不是很难。
import io.circe.generic.auto._
import org.json4s._
import org.json4s.native.Serialization.write
builder
.stream[String, String](args.INPUT_TOPIC)
.mapValues((elm) => {
parse(elm).extract[CallRecord]
})
.groupBy((key, value) => {
value.agentId
})
.windowedBy(every15Minute)
.aggregate(CallRecordAggByAgent.empty)((_, callRecord, aggregator) => {
CallRecordAggByAgent(
callRecord.agentId,
((callRecord.durationMinutes + aggregator.durationMinutesAvg) / aggregator.count).roundD,
((callRecord.waitingMinutes + aggregator.waitingMinutesAvg) / aggregator.count).roundD,
((callRecord.customerScore + aggregator.scoreAvg) / aggregator.count).roundD,
aggregator.count + 1
)
})
.mapValues((elm) => {
write(elm)
})
.toStream
.selectKey((k, v) => {
s"${k.key()}@${k.window().startTime().toEpochMilli.toString}"
})
.to(args.OUTPUT_TOPIC)
selectKey
提供了更改分组键的可能性,
所以在流式传输到输出主题之前,我从密钥中提取时间戳并将其作为字符串。