KSQL - 使用 GEO_DISTANCE 计算与 2 条消息的距离

KSQL - calculate distance from 2 messages using GEO_DISTANCE

我有一个 kafka 主题,主题中的每条消息都有 lat/lon 和事件时间戳。创建了一个引用主题的流,并想使用 geo_distance 计算 2 点之间的距离。 示例

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

我想在上面的流上创建一个新的流,并用距离丰富它。

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

是否可以使用 KSQL 达到预期的结果?或者如何在处理新消息时参考之前的消息?

首先,这些读数是否来自某种设备?如果是这样,您是否有他们的唯一 ID (UUID)?我会把它放到你的信息流中,所以它喜欢 UUID, GpsDateTime, lat, lon

您将需要创建一个相当基本的 Kafka Streams 应用程序。在此应用程序中,您会将流中的最新阅读存储到 StoreBuilder 中。然后,当从 Kafka 收到一条新消息时,您将检索这个最新值,进行计算,然后将新的经纬度值存储到 StoreBuilder 中。

当然,我不清楚您是否只想 ever 有一个经纬度值,并且所有后续值都是根据第一个读数计算得出的。或者,如果您想进行滚动计算,您总是比较上次读数和当前读数之间的距离。

无论如何,您可以在实践中看到这段代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

此示例是字数统计示例,但可以针对您的用例快速转换。

静态最终 class WordCountTransformerSupplier(第 78 行)将成为您的 LatLongDistanceComputation。

您将创建具有适当类型的 StoreBuilder(第 154 行)(无论您将 lat/lon 存储为什么)。

第 165 行是实际从流入的值流中读取项目的地方。

当然,您还需要编辑 inputTopic 和 outputTopic(第 66-67 行)以及其他一些内容。