Kafka 流计数在 scala 中不同吗?
Kafka stream count distinct in scala?
我需要在 kafka 流中对每个用户进行不同的计数。这是我的初始实现,但在 aggregate
上有错误
Required: [Seq[String], mutableHashSet[String]]
Found: mutable.HashSet[String]
而且我不确定如何为 mutable.HashSet
提供自定义 serde ...
val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
.filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
.groupBy((_, ev) => ev.eventData.custData.customerUid.get)
.aggregate(initializer = mutable.HashSet[String])(
(aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
val cat: String = newValue.eventData.cntData.contentCategory.get
aggValue += cat
aggValue
}, **Serde Here?**)
.mapValues((set: mutable.HashSet[String]) => set.size)
//.count()
totalUniqueCategoriesCounts.toStream.to("total_unique_categories")
如有任何帮助,我们将不胜感激。
我也很关心性能。这是在 kafka 流中进行不同计数的最佳方法吗?
更新 修复了代码问题,但仍然担心这个(如果有的话)的性能影响或做同样事情的任何更好的方法。
我好像只忘记了 mutable.HashSet[String]
之后的 ()
,所以应该是
val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
.filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
.groupBy((_, ev) => ev.eventData.custData.customerUid.get)
.aggregate(initializer = mutable.HashSet[String]())(
(aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
val cat: String = newValue.eventData.cntData.contentCategory.get
aggValue += cat
aggValue
})
.mapValues((set: mutable.HashSet[String]) => set.size)
//.count()
totalUniqueCategoriesCounts.toStream.to("total_unique_categories")
Intellij 完全让我失望了:/
性能问题仍然存在,非常感谢任何意见。
我需要在 kafka 流中对每个用户进行不同的计数。这是我的初始实现,但在 aggregate
上有错误Required: [Seq[String], mutableHashSet[String]]
Found: mutable.HashSet[String]
而且我不确定如何为 mutable.HashSet
提供自定义 serde ...
val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
.filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
.groupBy((_, ev) => ev.eventData.custData.customerUid.get)
.aggregate(initializer = mutable.HashSet[String])(
(aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
val cat: String = newValue.eventData.cntData.contentCategory.get
aggValue += cat
aggValue
}, **Serde Here?**)
.mapValues((set: mutable.HashSet[String]) => set.size)
//.count()
totalUniqueCategoriesCounts.toStream.to("total_unique_categories")
如有任何帮助,我们将不胜感激。
我也很关心性能。这是在 kafka 流中进行不同计数的最佳方法吗?
更新 修复了代码问题,但仍然担心这个(如果有的话)的性能影响或做同样事情的任何更好的方法。
我好像只忘记了 mutable.HashSet[String]
之后的 ()
,所以应该是
val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
.filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
.groupBy((_, ev) => ev.eventData.custData.customerUid.get)
.aggregate(initializer = mutable.HashSet[String]())(
(aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
val cat: String = newValue.eventData.cntData.contentCategory.get
aggValue += cat
aggValue
})
.mapValues((set: mutable.HashSet[String]) => set.size)
//.count()
totalUniqueCategoriesCounts.toStream.to("total_unique_categories")
Intellij 完全让我失望了:/
性能问题仍然存在,非常感谢任何意见。