Kafka 比较键的连续值

Kafka compare consecutive values for a key

我们正在构建一个从传感器获取数据的应用程序。数据流式传输到 Kafka,消费者将从那里将其发布到不同的数据存储。每个数据点将具有多个表示传感器状态的属性。

在其中一个消费者中,我们希望仅当值发生变化时才将数据发布到数据存储。例如如果有温度传感器每 10 秒轮询一次数据,我们预计会收到类似

的数据
----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}

在上述情况下,只应发布第一条记录和第三条记录。

为此,我们需要一些方法来比较键的当前值与具有相同键的先前值。我相信 KTable 或 KStream 应该可以做到这一点,但找不到示例。

任何帮助都会很棒!

如果您想使用 Kafka Streams 执行此操作,则必须使用处理器 API。

您需要使用 State store 实现自定义 Transformer。 对于每条消息,如果它已更改或不存在,您应该在 State store 中搜索值,您应该 return 新值,否则为空。除此之外,您还应该将该值保存在状态存储中 (KeyValueStore::put(...))

可以找到有关处理器 API 的更多信息:here

您可以使用 Kafka 流 Processor API。您可以将本地键值存储设置为状态上下文。为每条获取的记录调用处理函数。

在流程函数中,您可以检查最后存储的值并根据业务逻辑接受或拒绝最新记录(在您的情况下比较温度值)。

在标点函数中,您可以按计划将记录转发给消费者。请参阅下面的示例代码(没有标点符号)

public class SensorProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, String> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;

        // retrieve the key-value store named "SensorData"
        kvStore = (KeyValueStore) context.getStateStore("SensorData");

        // schedule a punctuate() method every second based on event-time
      
    }

    @Override
    public void process(String sensorName, String sensorData) {
      
        String oldValue = this.kvStore.get(sensorName);

        if (oldValue == null) {
            this.kvStore.put(sensorName, sensorData);
        } else {
            //Put the business logic for comparison
            //compare temperatures
            //if required put the value
            this.kvStore.put(sensorName, sensorData);

            //Forward it o consumer
            context.forward(sensorName, sensorData);
        }
        context.commit();
    }

    @Override
    public void close() {
        // nothing to do
    }
}

这是一个如何用 KStream#transformValues() 解决这个问题的例子。

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);

您将该记录与存储在状态存储中的先前记录进行比较。如果温度不同,您 return 来自状态存储的记录并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。