如果重新分区的密钥与状态存储密钥不同,Kstreams 应用程序的多个实例是否会处理有状态数据?
Will multiple instances of Kstreams apps process stateful data if the repartitioned key isn't the same as the state store key?
示例:我有一个乘客流和一个公共汽车流。乘客在他们的流中有一个 ID 和一个 gps 值。 Bus 流有一个 ID 和一个 passengerID(每次有新乘客上车时,都会在总线主题上放置一条新消息,并在其有效负载中包含巴士 ID 和 passenger ID)。
我想计算每辆公共汽车的里程 - 基于 "primary" 乘客的 GPS 值(他们拥有最可靠的 GPS 数据)。主要乘客会不时更改。
我需要确保在不同的流式应用程序实例中不会对同一辆巴士(和不同的乘客)进行多次距离计算。
我的问题:
是否始终将特定 busID 的乘客数据发送到同一分区,以确保以前和当前的乘客消息不会被发送到不同的分区,从而发送到不同的流实例 - 这将导致 GPS 无效距离计算?
passengerStream.map((k, v) -> new KeyValue<>(v.getPassengerId(), v))
.join(busTable, busPassengerStatus::new,
Joined.with(Serdes.String(), passengerStreamSerde, busPassengerJsonSerDe))
.map((k, v) -> new KeyValue<>(v.getBusId(), v))
.transform(distanceProcessorSupplier, calculatedDistanceStoreSupplier.name(), previousPassengerStateStoreSupplier.name())
//.print(Printed.toSysOut())
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), new JsonSerDe<>(CalculatedDistance.class)));
详情
有一个公交车 KTable,其键为 passengerID,值为 BusId,一个 State Store 保存了 "primary" 乘客的先前记录(以计算两点之间的距离)和一个滚动距离状态存储,它采用以前的距离并根据当前和以前的 GPS 添加新距离。
数据根据 "partitioning strategy" 进行分区。默认情况下,消息按其键进行分区。因此,如果您不指定不同的分区程序,则可以在消息键中设置 busId,并且同一总线的所有记录都将转到同一分区。
如果您不能将 busId 设置为消息键,您可以实现一个自定义 Partitioner
并使用您的自定义分区程序配置您的生产者,该分区程序可以从值中提取 busId 并计算消息应该是的相应分区写给。作为第三种选择,当您创建 ProducerRecord
时,您可以明确指定其分区(对于这种情况,Partitioner
不会用于计算分区)。
示例:我有一个乘客流和一个公共汽车流。乘客在他们的流中有一个 ID 和一个 gps 值。 Bus 流有一个 ID 和一个 passengerID(每次有新乘客上车时,都会在总线主题上放置一条新消息,并在其有效负载中包含巴士 ID 和 passenger ID)。
我想计算每辆公共汽车的里程 - 基于 "primary" 乘客的 GPS 值(他们拥有最可靠的 GPS 数据)。主要乘客会不时更改。
我需要确保在不同的流式应用程序实例中不会对同一辆巴士(和不同的乘客)进行多次距离计算。
我的问题:
是否始终将特定 busID 的乘客数据发送到同一分区,以确保以前和当前的乘客消息不会被发送到不同的分区,从而发送到不同的流实例 - 这将导致 GPS 无效距离计算?
passengerStream.map((k, v) -> new KeyValue<>(v.getPassengerId(), v))
.join(busTable, busPassengerStatus::new,
Joined.with(Serdes.String(), passengerStreamSerde, busPassengerJsonSerDe))
.map((k, v) -> new KeyValue<>(v.getBusId(), v))
.transform(distanceProcessorSupplier, calculatedDistanceStoreSupplier.name(), previousPassengerStateStoreSupplier.name())
//.print(Printed.toSysOut())
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), new JsonSerDe<>(CalculatedDistance.class)));
详情
有一个公交车 KTable,其键为 passengerID,值为 BusId,一个 State Store 保存了 "primary" 乘客的先前记录(以计算两点之间的距离)和一个滚动距离状态存储,它采用以前的距离并根据当前和以前的 GPS 添加新距离。
数据根据 "partitioning strategy" 进行分区。默认情况下,消息按其键进行分区。因此,如果您不指定不同的分区程序,则可以在消息键中设置 busId,并且同一总线的所有记录都将转到同一分区。
如果您不能将 busId 设置为消息键,您可以实现一个自定义 Partitioner
并使用您的自定义分区程序配置您的生产者,该分区程序可以从值中提取 busId 并计算消息应该是的相应分区写给。作为第三种选择,当您创建 ProducerRecord
时,您可以明确指定其分区(对于这种情况,Partitioner
不会用于计算分区)。