Kafka 流计数在 scala 中不同吗?

Kafka stream count distinct in scala?

我需要在 kafka 流中对每个用户进行不同的计数。这是我的初始实现,但在 aggregate

上有错误
Required: [Seq[String], mutableHashSet[String]]
Found: mutable.HashSet[String]

而且我不确定如何为 mutable.HashSet 提供自定义 serde ...

val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
    .filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
    .groupBy((_, ev) => ev.eventData.custData.customerUid.get)
    .aggregate(initializer = mutable.HashSet[String])(
      (aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
        val cat: String = newValue.eventData.cntData.contentCategory.get
        aggValue += cat
        aggValue
      }, **Serde Here?**)
    .mapValues((set: mutable.HashSet[String]) => set.size)
    //.count()
  totalUniqueCategoriesCounts.toStream.to("total_unique_categories")

如有任何帮助,我们将不胜感激。

我也很关心性能。这是在 kafka 流中进行不同计数的最佳方法吗?

更新 修复了代码问题,但仍然担心这个(如果有的话)的性能影响或做同样事情的任何更好的方法。

我好像只忘记了 mutable.HashSet[String] 之后的 (),所以应该是

val totalUniqueCategoriesCounts: KTable[String, Int] = inputStream
    .filter((_ , ev) => ev.eventData.evData.pageType.isDefined)
    .groupBy((_, ev) => ev.eventData.custData.customerUid.get)
    .aggregate(initializer = mutable.HashSet[String]())(
      (aggKey: String, newValue: Event, aggValue: mutable.HashSet[String]) => {
        val cat: String = newValue.eventData.cntData.contentCategory.get
        aggValue += cat
        aggValue
      })
    .mapValues((set: mutable.HashSet[String]) => set.size)
    //.count()
  totalUniqueCategoriesCounts.toStream.to("total_unique_categories")

Intellij 完全让我失望了:/

性能问题仍然存在,非常感谢任何意见。