有没有办法用 Kafka-Stream 重新计算每条消息的平均值?

Is there a way to recalculate the average at every message with Kafka-Stream?

我正在尝试对由功能组成的 Kafka 消息的 周期 进行统计。当我收到同一周期的新消息时,我想重新计算此功能的峰度

我目前能够对流式传输的每条消息执行简单的聚合(加法、计数)。

  //set message count as new key (instead of String null)
  val newStream : KStream[Int, Message] = builder.stream[String, Message]("queueing.sensors.data" )(consumed).map((_,v) => (v.msg_count,v))

  //stream ->  ktable
  newStream.to("Dummy-ReduceInputTopic")(produced2)
  val cycleTable : KTable[Int, Message] = builder.table("Dummy-ReduceInputTopic")

  //aggregate values
  val cycleTable2 : KTable[Int, Seq[Message]] = cycleTable
    .groupBy((k, v) => (v.cycle,v))(serializedFromSerde(intSerde,messageSerde))
    .aggregate[Seq[Message]](Seq[Message]())((aggkey,newvalue,aggvalue) => aggvalue :+ newvalue, (aggkey,newvalue,aggvalue) => aggvalue)(materializedFromSerde(intSerde,seqmesageSerde))

  //create messageList objects => apply predictions
  val cycleTable3 : KStream[Int, Double] = cycleTable2.toStream.map((k,v) => (k,MessageList(v.toSeq).skewness_ps1))

Kafka 中是否有等同于 Spark Streaming Sliding Windows 的东西?

我应该为这个用例放弃 Kafka Stream 吗?对于 Spark 流媒体?

在此先感谢您的关注。

在 Kafka 中你也有 Windowing 的概念: https://docs.confluent.io/current/streams/concepts.html#windowing

这里有一些例子:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?

Kafka 流示例

KTable<Windowed<Key>, Value> fifteenMinuteWindowed = 

    fiveMinuteWindowed

    .groupBy( (windowedKey, value) -> 
        new KeyValue<>(
            new Windowed<>(
                windowedKey.key(),
                new Window<>(
                    windowedKey.window().start() /1000/60/15 *1000*60*15,
                    windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15
                    // the above rounds time down to a timestamp divisible by 15 minutes
                )
            ),
            value
        ),
        /* your key serde */, 
        /* your value serde */
    )
    .reduce(/*your adder*/, /*your subtractor*/, "store15m");

您还可以考虑 KSQL,它具有以下概念:

跳跃 Window 基于时间的固定持续时间,重叠 windows

翻滚 Window 基于时间的固定持续时间,不重叠,无间隙 windows

会话 Window 基于会话的动态大小、非重叠、数据驱动 windows

KSQL 示例:

SELECT regionid, COUNT(*) FROM pageviews
  WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
  WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
  GROUP BY regionid;