Kafka 比较键的连续值
Kafka compare consecutive values for a key
我们正在构建一个从传感器获取数据的应用程序。数据流式传输到 Kafka,消费者将从那里将其发布到不同的数据存储。每个数据点将具有多个表示传感器状态的属性。
在其中一个消费者中,我们希望仅当值发生变化时才将数据发布到数据存储。例如如果有温度传感器每 10 秒轮询一次数据,我们预计会收到类似
的数据
----------------------------------------------------------------------
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:50", temperature: 11}
在上述情况下,只应发布第一条记录和第三条记录。
为此,我们需要一些方法来比较键的当前值与具有相同键的先前值。我相信 KTable 或 KStream 应该可以做到这一点,但找不到示例。
任何帮助都会很棒!
如果您想使用 Kafka Streams 执行此操作,则必须使用处理器 API。
您需要使用 State store 实现自定义 Transformer
。
对于每条消息,如果它已更改或不存在,您应该在 State store 中搜索值,您应该 return 新值,否则为空。除此之外,您还应该将该值保存在状态存储中 (KeyValueStore::put(...)
)
可以找到有关处理器 API 的更多信息:here
您可以使用 Kafka 流 Processor API。您可以将本地键值存储设置为状态上下文。为每条获取的记录调用处理函数。
在流程函数中,您可以检查最后存储的值并根据业务逻辑接受或拒绝最新记录(在您的情况下比较温度值)。
在标点函数中,您可以按计划将记录转发给消费者。请参阅下面的示例代码(没有标点符号)
public class SensorProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, String> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// retrieve the key-value store named "SensorData"
kvStore = (KeyValueStore) context.getStateStore("SensorData");
// schedule a punctuate() method every second based on event-time
}
@Override
public void process(String sensorName, String sensorData) {
String oldValue = this.kvStore.get(sensorName);
if (oldValue == null) {
this.kvStore.put(sensorName, sensorData);
} else {
//Put the business logic for comparison
//compare temperatures
//if required put the value
this.kvStore.put(sensorName, sensorData);
//Forward it o consumer
context.forward(sensorName, sensorData);
}
context.commit();
}
@Override
public void close() {
// nothing to do
}
}
这是一个如何用 KStream#transformValues()
解决这个问题的例子。
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.String(),
YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
.transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
private KeyValueStore<String, YourValueType> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}
@Override
public YourValueType transform(final String key, final YourValueType value) {
YourValueType prevValue = state.get(key);
if (prevValue != null) {
if (prevValue.temperature() != value.temperature()) {
return prevValue;
}
} else {
state.put(key, value);
}
return null;
}
@Override
public void close() {}
}, stateStorName))
.to(OUTPUT_TOPIC);
您将该记录与存储在状态存储中的先前记录进行比较。如果温度不同,您 return 来自状态存储的记录并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。
我们正在构建一个从传感器获取数据的应用程序。数据流式传输到 Kafka,消费者将从那里将其发布到不同的数据存储。每个数据点将具有多个表示传感器状态的属性。
在其中一个消费者中,我们希望仅当值发生变化时才将数据发布到数据存储。例如如果有温度传感器每 10 秒轮询一次数据,我们预计会收到类似
的数据----------------------------------------------------------------------
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:50", temperature: 11}
在上述情况下,只应发布第一条记录和第三条记录。
为此,我们需要一些方法来比较键的当前值与具有相同键的先前值。我相信 KTable 或 KStream 应该可以做到这一点,但找不到示例。
任何帮助都会很棒!
如果您想使用 Kafka Streams 执行此操作,则必须使用处理器 API。
您需要使用 State store 实现自定义 Transformer
。
对于每条消息,如果它已更改或不存在,您应该在 State store 中搜索值,您应该 return 新值,否则为空。除此之外,您还应该将该值保存在状态存储中 (KeyValueStore::put(...)
)
可以找到有关处理器 API 的更多信息:here
您可以使用 Kafka 流 Processor API。您可以将本地键值存储设置为状态上下文。为每条获取的记录调用处理函数。
在流程函数中,您可以检查最后存储的值并根据业务逻辑接受或拒绝最新记录(在您的情况下比较温度值)。
在标点函数中,您可以按计划将记录转发给消费者。请参阅下面的示例代码(没有标点符号)
public class SensorProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, String> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// retrieve the key-value store named "SensorData"
kvStore = (KeyValueStore) context.getStateStore("SensorData");
// schedule a punctuate() method every second based on event-time
}
@Override
public void process(String sensorName, String sensorData) {
String oldValue = this.kvStore.get(sensorName);
if (oldValue == null) {
this.kvStore.put(sensorName, sensorData);
} else {
//Put the business logic for comparison
//compare temperatures
//if required put the value
this.kvStore.put(sensorName, sensorData);
//Forward it o consumer
context.forward(sensorName, sensorData);
}
context.commit();
}
@Override
public void close() {
// nothing to do
}
}
这是一个如何用 KStream#transformValues()
解决这个问题的例子。
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.String(),
YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
.transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
private KeyValueStore<String, YourValueType> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}
@Override
public YourValueType transform(final String key, final YourValueType value) {
YourValueType prevValue = state.get(key);
if (prevValue != null) {
if (prevValue.temperature() != value.temperature()) {
return prevValue;
}
} else {
state.put(key, value);
}
return null;
}
@Override
public void close() {}
}, stateStorName))
.to(OUTPUT_TOPIC);
您将该记录与存储在状态存储中的先前记录进行比较。如果温度不同,您 return 来自状态存储的记录并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。