有没有办法用 Kafka-Stream 重新计算每条消息的平均值?
Is there a way to recalculate the average at every message with Kafka-Stream?
我正在尝试对由功能组成的 Kafka 消息的 周期 进行统计。当我收到同一周期的新消息时,我想重新计算此功能的峰度。
我目前能够对流式传输的每条消息执行简单的聚合(加法、计数)。
//set message count as new key (instead of String null)
val newStream : KStream[Int, Message] = builder.stream[String, Message]("queueing.sensors.data" )(consumed).map((_,v) => (v.msg_count,v))
//stream -> ktable
newStream.to("Dummy-ReduceInputTopic")(produced2)
val cycleTable : KTable[Int, Message] = builder.table("Dummy-ReduceInputTopic")
//aggregate values
val cycleTable2 : KTable[Int, Seq[Message]] = cycleTable
.groupBy((k, v) => (v.cycle,v))(serializedFromSerde(intSerde,messageSerde))
.aggregate[Seq[Message]](Seq[Message]())((aggkey,newvalue,aggvalue) => aggvalue :+ newvalue, (aggkey,newvalue,aggvalue) => aggvalue)(materializedFromSerde(intSerde,seqmesageSerde))
//create messageList objects => apply predictions
val cycleTable3 : KStream[Int, Double] = cycleTable2.toStream.map((k,v) => (k,MessageList(v.toSeq).skewness_ps1))
Kafka 中是否有等同于 Spark Streaming Sliding Windows 的东西?
我应该为这个用例放弃 Kafka Stream 吗?对于 Spark 流媒体?
在此先感谢您的关注。
在 Kafka 中你也有 Windowing 的概念:
https://docs.confluent.io/current/streams/concepts.html#windowing
Kafka 流示例
KTable<Windowed<Key>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed
.groupBy( (windowedKey, value) ->
new KeyValue<>(
new Windowed<>(
windowedKey.key(),
new Window<>(
windowedKey.window().start() /1000/60/15 *1000*60*15,
windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15
// the above rounds time down to a timestamp divisible by 15 minutes
)
),
value
),
/* your key serde */,
/* your value serde */
)
.reduce(/*your adder*/, /*your subtractor*/, "store15m");
您还可以考虑 KSQL,它具有以下概念:
跳跃 Window 基于时间的固定持续时间,重叠 windows
翻滚 Window 基于时间的固定持续时间,不重叠,无间隙 windows
会话 Window 基于会话的动态大小、非重叠、数据驱动 windows
KSQL 示例:
SELECT regionid, COUNT(*) FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;
我正在尝试对由功能组成的 Kafka 消息的 周期 进行统计。当我收到同一周期的新消息时,我想重新计算此功能的峰度。
我目前能够对流式传输的每条消息执行简单的聚合(加法、计数)。
//set message count as new key (instead of String null)
val newStream : KStream[Int, Message] = builder.stream[String, Message]("queueing.sensors.data" )(consumed).map((_,v) => (v.msg_count,v))
//stream -> ktable
newStream.to("Dummy-ReduceInputTopic")(produced2)
val cycleTable : KTable[Int, Message] = builder.table("Dummy-ReduceInputTopic")
//aggregate values
val cycleTable2 : KTable[Int, Seq[Message]] = cycleTable
.groupBy((k, v) => (v.cycle,v))(serializedFromSerde(intSerde,messageSerde))
.aggregate[Seq[Message]](Seq[Message]())((aggkey,newvalue,aggvalue) => aggvalue :+ newvalue, (aggkey,newvalue,aggvalue) => aggvalue)(materializedFromSerde(intSerde,seqmesageSerde))
//create messageList objects => apply predictions
val cycleTable3 : KStream[Int, Double] = cycleTable2.toStream.map((k,v) => (k,MessageList(v.toSeq).skewness_ps1))
Kafka 中是否有等同于 Spark Streaming Sliding Windows 的东西?
我应该为这个用例放弃 Kafka Stream 吗?对于 Spark 流媒体?
在此先感谢您的关注。
在 Kafka 中你也有 Windowing 的概念: https://docs.confluent.io/current/streams/concepts.html#windowing
Kafka 流示例
KTable<Windowed<Key>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed
.groupBy( (windowedKey, value) ->
new KeyValue<>(
new Windowed<>(
windowedKey.key(),
new Window<>(
windowedKey.window().start() /1000/60/15 *1000*60*15,
windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15
// the above rounds time down to a timestamp divisible by 15 minutes
)
),
value
),
/* your key serde */,
/* your value serde */
)
.reduce(/*your adder*/, /*your subtractor*/, "store15m");
您还可以考虑 KSQL,它具有以下概念:
跳跃 Window 基于时间的固定持续时间,重叠 windows
翻滚 Window 基于时间的固定持续时间,不重叠,无间隙 windows
会话 Window 基于会话的动态大小、非重叠、数据驱动 windows
KSQL 示例:
SELECT regionid, COUNT(*) FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;