使用 ValueTransformerWithKey 计算平均值或者我可以使用 Kafka 的聚合函数

Calculating an average using a ValueTransformerWithKey Or can I use Kafka's aggregation functions

我有一个对象流,我想在其中计算该对象中某个字段的平均值,然后将该平均值保存回该对象。我想要 5 分钟的翻滚 window,并保留 1 小时。我是 Kafka 的新手,所以我想知道这是否是解决问题的正确方法。

首先,我创建一个持久存储:

StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
        Serdes.String(),
        Serdes.Double());

streamsBuilder.addStateStore(averagesStoreSupplier);

然后我调用我的变压器使用:

otherKTable
    .leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME), 
            myValueJoiner)
    .to("outputTopic")

这是我的变压器:

public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {

    private final String stateStoreName;

    public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    public ValueTransformerWithKey<String, MyObject, MyObject> get() {
        return new ValueTransformerWithKey<>() {

            private WindowStore<String, Double> averagesStore;

            @Override
            public void init(ProcessorContext processorContext) {
                averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
            }

            @Override
            public MyObject transform(String s, MyObject myObject) {
                if (averagesStore != null) {
                    averagesStore.put(s, myObject.getNumber());

                    Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
                    Instant timeTo = Instant.now();
                    WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);

                    double sum = 0.0;
                    int size = 0;
                    while(itr.hasNext()) {
                        KeyValue<Long, Double> next = itr.next();
                        size++;
                        sum += next.value;
                    }

                    myObject.setNumber(sum / size);

                }

                return myObject;
            }

            @Override
            public void close() {
                if (averagesStore != null) {
                    averagesStore.flush();
                }
            }
        };
    }
}

我有几个问题。 首先,我定义 WindowStore 的方式是否是形成翻滚 window 的正确方式?我将如何创建跳跃 window?

其次,在我的变压器里,我从商店里得到了从一开始到现在的所有物品。由于我将其定义为 5 分钟 window 和 1 小时保留,这是否意味着商店中的项目是 5 分钟价值数据的快照?保留在这里做什么?

我有这个工作在琐碎的情况下,但不确定是否有更好的方法使用聚合和连接来做到这一点,或者即使我这样做是正确的。此外,我还必须在 try catch 中围绕获取商店的检索,因为 init 被多次调用,有时我会得到 Processor has no access to StateStore 异常。

对于此用例,我建议使用 DSL 而不是处理器 API。比照。 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns 了解详情。

I have a couple of questions. First, is the way I define the WindowStore the correct way to form a tumbling window? How would I create a hopping window?

windowed 存储可用于跳跃或翻滚 window -- 这取决于您如何在处理器中使用,而不是如何你创建商店,你得到什么window语义。

Second, inside my transformer I get all the items from the store from the beginning of time to now. Since I defined it as a 5 minute window and 1 hour retention does that mean that the items in the store is a snapshot of 5 minutes worth of data? What does the retention do here?

创建商店时的参数 windowSize 未按预期方式工作。您需要使用 put(key, value, windowStartTimestamp)Transformer 代码中手动编写 windowing 逻辑 - atm,您使用的 put(key, value) 使用 context.timestamp(),即,当前记录时间戳,如 windowStartTimestamp——我怀疑这就是您想要的。保留时间基于window时间戳,即旧windows过期后将被删除。