Kafka Streams消息消费顺序
Kafka Streams Message Consumption order
该主题包含 10 个分区,这些分区包含各种 IoT 设备每 3 到 4 秒生成的消息。消息中的键是 LocationId,DeviceId.The 值是设备相关的详细信息。
流拓扑部署到 4 个 EC2 实例。该过程必须确定每个设备的最新更新值并分析关键性。
我看到的是,由于消息分布在多个分区中,流消费者看到的是较旧的消息,而且它们没有按顺序排列。
如何确定特定密钥的最新消息?
我在 Kafka 集群上看到以下消息行为 -
L1D1 at 1:00 am - critical=false (P1)
L2D2 at 1:00 am - critical=false (P1)
L1D1 at 1:02 am - critical=**true** (P2)
L2D2 at 1:05 am - critical=false (P1)
L1D1 at 1:03 am - critical=false (P2)
L2D2 at 1:03 am - critical=false (P1)
请注意,在 1:02 设备 D1 有严重警报,但在 1:03 没有。如果流处理消息是1:03,1:02(基于分区的任意随机顺序)
订单无法保证,如何高效判断特定设备的最新消息?
你的流处理策略是什么? KSQL 还是 SDK?
如果你使用 KSQl,你只需要创建一个 stream/table
检查:
https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html
How do I determine the latest message for specific device efficiently since the order is not guaranteed?
Kafka 保证消息在一个主题分区内排序,但不保证跨多个主题分区。您需要做的是确保来自同一设备的消息被发送到同一主题 partition。如果你没有改变 Kafka 的默认设置,你可以通过使用 device-specific 标识符来实现这一点(想想:DeviceId
)。
What I am seeing is that since messages are distributed across multiple partitions, stream consumer sees older messages and they are not in order.
如果您使用像 (LocationId, DeviceId)
这样的复合密钥,那么您将不会按顺序获取同一设备的更新,因为设备的消息分布在 多个分区 因为消息密钥还包括 LocationId
.
The process must determine the latest update value from each of the devices and analyze for criticality. [...] How do I determine the latest message for specific device efficiently since the order is not guaranteed?
在你的情况下,我会将消息密钥从 (LocationId, DeviceId)
更改为 DeviceId
。我们称之为 "stream D".
如果您仍然需要 (LocationId, DeviceId)
的原始分组,您可以随后通过 re-grouping(又名 re-keying 又名 re-partitioning)来自 [= 的流 D 来实现此目的10=] 到 (LocationId, DeviceId)
到一个新的派生流 LD.
该主题包含 10 个分区,这些分区包含各种 IoT 设备每 3 到 4 秒生成的消息。消息中的键是 LocationId,DeviceId.The 值是设备相关的详细信息。
流拓扑部署到 4 个 EC2 实例。该过程必须确定每个设备的最新更新值并分析关键性。
我看到的是,由于消息分布在多个分区中,流消费者看到的是较旧的消息,而且它们没有按顺序排列。
如何确定特定密钥的最新消息?
我在 Kafka 集群上看到以下消息行为 -
L1D1 at 1:00 am - critical=false (P1)
L2D2 at 1:00 am - critical=false (P1)
L1D1 at 1:02 am - critical=**true** (P2)
L2D2 at 1:05 am - critical=false (P1)
L1D1 at 1:03 am - critical=false (P2)
L2D2 at 1:03 am - critical=false (P1)
请注意,在 1:02 设备 D1 有严重警报,但在 1:03 没有。如果流处理消息是1:03,1:02(基于分区的任意随机顺序)
订单无法保证,如何高效判断特定设备的最新消息?
你的流处理策略是什么? KSQL 还是 SDK? 如果你使用 KSQl,你只需要创建一个 stream/table
检查: https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html
How do I determine the latest message for specific device efficiently since the order is not guaranteed?
Kafka 保证消息在一个主题分区内排序,但不保证跨多个主题分区。您需要做的是确保来自同一设备的消息被发送到同一主题 partition。如果你没有改变 Kafka 的默认设置,你可以通过使用 device-specific 标识符来实现这一点(想想:DeviceId
)。
What I am seeing is that since messages are distributed across multiple partitions, stream consumer sees older messages and they are not in order.
如果您使用像 (LocationId, DeviceId)
这样的复合密钥,那么您将不会按顺序获取同一设备的更新,因为设备的消息分布在 多个分区 因为消息密钥还包括 LocationId
.
The process must determine the latest update value from each of the devices and analyze for criticality. [...] How do I determine the latest message for specific device efficiently since the order is not guaranteed?
在你的情况下,我会将消息密钥从 (LocationId, DeviceId)
更改为 DeviceId
。我们称之为 "stream D".
如果您仍然需要 (LocationId, DeviceId)
的原始分组,您可以随后通过 re-grouping(又名 re-keying 又名 re-partitioning)来自 [= 的流 D 来实现此目的10=] 到 (LocationId, DeviceId)
到一个新的派生流 LD.