使用 ValueTransformerWithKey 计算平均值或者我可以使用 Kafka 的聚合函数
Calculating an average using a ValueTransformerWithKey Or can I use Kafka's aggregation functions
我有一个对象流,我想在其中计算该对象中某个字段的平均值,然后将该平均值保存回该对象。我想要 5 分钟的翻滚 window,并保留 1 小时。我是 Kafka 的新手,所以我想知道这是否是解决问题的正确方法。
首先,我创建一个持久存储:
StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
Serdes.String(),
Serdes.Double());
streamsBuilder.addStateStore(averagesStoreSupplier);
然后我调用我的变压器使用:
otherKTable
.leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME),
myValueJoiner)
.to("outputTopic")
这是我的变压器:
public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {
private final String stateStoreName;
public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
this.stateStoreName = stateStoreName;
}
public ValueTransformerWithKey<String, MyObject, MyObject> get() {
return new ValueTransformerWithKey<>() {
private WindowStore<String, Double> averagesStore;
@Override
public void init(ProcessorContext processorContext) {
averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
}
@Override
public MyObject transform(String s, MyObject myObject) {
if (averagesStore != null) {
averagesStore.put(s, myObject.getNumber());
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now();
WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);
double sum = 0.0;
int size = 0;
while(itr.hasNext()) {
KeyValue<Long, Double> next = itr.next();
size++;
sum += next.value;
}
myObject.setNumber(sum / size);
}
return myObject;
}
@Override
public void close() {
if (averagesStore != null) {
averagesStore.flush();
}
}
};
}
}
我有几个问题。
首先,我定义 WindowStore 的方式是否是形成翻滚 window 的正确方式?我将如何创建跳跃 window?
其次,在我的变压器里,我从商店里得到了从一开始到现在的所有物品。由于我将其定义为 5 分钟 window 和 1 小时保留,这是否意味着商店中的项目是 5 分钟价值数据的快照?保留在这里做什么?
我有这个工作在琐碎的情况下,但不确定是否有更好的方法使用聚合和连接来做到这一点,或者即使我这样做是正确的。此外,我还必须在 try catch 中围绕获取商店的检索,因为 init 被多次调用,有时我会得到 Processor has no access to StateStore
异常。
对于此用例,我建议使用 DSL 而不是处理器 API。比照。 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns 了解详情。
I have a couple of questions. First, is the way I define the WindowStore the correct way to form a tumbling window? How would I create a hopping window?
windowed 存储可用于跳跃或翻滚 window -- 这取决于您如何在处理器中使用,而不是如何你创建商店,你得到什么window语义。
Second, inside my transformer I get all the items from the store from the beginning of time to now. Since I defined it as a 5 minute window and 1 hour retention does that mean that the items in the store is a snapshot of 5 minutes worth of data? What does the retention do here?
创建商店时的参数 windowSize
未按预期方式工作。您需要使用 put(key, value, windowStartTimestamp)
在 Transformer
代码中手动编写 windowing 逻辑 - atm,您使用的 put(key, value)
使用 context.timestamp()
,即,当前记录时间戳,如 windowStartTimestamp——我怀疑这就是您想要的。保留时间基于window时间戳,即旧windows过期后将被删除。
我有一个对象流,我想在其中计算该对象中某个字段的平均值,然后将该平均值保存回该对象。我想要 5 分钟的翻滚 window,并保留 1 小时。我是 Kafka 的新手,所以我想知道这是否是解决问题的正确方法。
首先,我创建一个持久存储:
StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
Serdes.String(),
Serdes.Double());
streamsBuilder.addStateStore(averagesStoreSupplier);
然后我调用我的变压器使用:
otherKTable
.leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME),
myValueJoiner)
.to("outputTopic")
这是我的变压器:
public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {
private final String stateStoreName;
public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
this.stateStoreName = stateStoreName;
}
public ValueTransformerWithKey<String, MyObject, MyObject> get() {
return new ValueTransformerWithKey<>() {
private WindowStore<String, Double> averagesStore;
@Override
public void init(ProcessorContext processorContext) {
averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
}
@Override
public MyObject transform(String s, MyObject myObject) {
if (averagesStore != null) {
averagesStore.put(s, myObject.getNumber());
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now();
WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);
double sum = 0.0;
int size = 0;
while(itr.hasNext()) {
KeyValue<Long, Double> next = itr.next();
size++;
sum += next.value;
}
myObject.setNumber(sum / size);
}
return myObject;
}
@Override
public void close() {
if (averagesStore != null) {
averagesStore.flush();
}
}
};
}
}
我有几个问题。 首先,我定义 WindowStore 的方式是否是形成翻滚 window 的正确方式?我将如何创建跳跃 window?
其次,在我的变压器里,我从商店里得到了从一开始到现在的所有物品。由于我将其定义为 5 分钟 window 和 1 小时保留,这是否意味着商店中的项目是 5 分钟价值数据的快照?保留在这里做什么?
我有这个工作在琐碎的情况下,但不确定是否有更好的方法使用聚合和连接来做到这一点,或者即使我这样做是正确的。此外,我还必须在 try catch 中围绕获取商店的检索,因为 init 被多次调用,有时我会得到 Processor has no access to StateStore
异常。
对于此用例,我建议使用 DSL 而不是处理器 API。比照。 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns 了解详情。
I have a couple of questions. First, is the way I define the WindowStore the correct way to form a tumbling window? How would I create a hopping window?
windowed 存储可用于跳跃或翻滚 window -- 这取决于您如何在处理器中使用,而不是如何你创建商店,你得到什么window语义。
Second, inside my transformer I get all the items from the store from the beginning of time to now. Since I defined it as a 5 minute window and 1 hour retention does that mean that the items in the store is a snapshot of 5 minutes worth of data? What does the retention do here?
创建商店时的参数 windowSize
未按预期方式工作。您需要使用 put(key, value, windowStartTimestamp)
在 Transformer
代码中手动编写 windowing 逻辑 - atm,您使用的 put(key, value)
使用 context.timestamp()
,即,当前记录时间戳,如 windowStartTimestamp——我怀疑这就是您想要的。保留时间基于window时间戳,即旧windows过期后将被删除。