使用 Kafka Stream 中的 State-store(RocksDB) 将一条记录转换为多条记录
Transform one record into more than one record using State-store(RocksDB) in Kafka Stream
我想使用 state-store(RocksDB) 将一条记录转换为多条记录。我知道有一种方法,例如 stream.transform(final TransformerSupplier> transformerSupplier,final String... stateStoreNames) 但是如何 return 多个 KeyValue 对,以便我以后可以使用分支发布到受人尊敬的话题?
有一种方法可以向下游转发数据,但我如何才能再次使用该数据?
Kafka 版本 - 1.1.0
如果要将记录转换为多个键值对,可以使用flatMap
操作。您可以修改键和值并生成零个、一个或多个记录。
KStream<String, Integer> transformed = stream.flatMap(
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
(key, value) -> {
List<KeyValue<String, Integer>> result = new LinkedList<>();
result.add(KeyValue.pair(value.toUpperCase(), 1000));
result.add(KeyValue.pair(value.toLowerCase(), 9000));
return result;
}
);
如果我没理解错的话,您想根据状态存储中的数据发出多条记录。使用 Kafka Streams 2.2 之前的 transform()
,您可以通过在 Transformer
中多次调用 context.forward()
来实现。例如:
stream
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
context.forward(key, value);
context.forward(key + 1, value + 1;)
return null;
}
@Override
public void close() {
}
}, stateStoreName);
请注意,通过使用 context.forward()
,您没有编译时类型安全。如果转发类型不符合输出类型 KStream
(上例中的 <Integer, Integer>
)的记录,代码会编译但会在运行时抛出异常。
从 Kafka Streams 2.2 开始,您可以使用 flatTransform()
。使用 flatTransform()
,您可以 return 记录列表,而不是像上面的示例那样多次使用 context.forward()
和 return null
。以这种方式使用 flatTransform()
保证编译时类型安全。
我想使用 state-store(RocksDB) 将一条记录转换为多条记录。我知道有一种方法,例如 stream.transform(final TransformerSupplier> transformerSupplier,final String... stateStoreNames) 但是如何 return 多个 KeyValue 对,以便我以后可以使用分支发布到受人尊敬的话题?
有一种方法可以向下游转发数据,但我如何才能再次使用该数据?
Kafka 版本 - 1.1.0
如果要将记录转换为多个键值对,可以使用flatMap
操作。您可以修改键和值并生成零个、一个或多个记录。
KStream<String, Integer> transformed = stream.flatMap(
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
(key, value) -> {
List<KeyValue<String, Integer>> result = new LinkedList<>();
result.add(KeyValue.pair(value.toUpperCase(), 1000));
result.add(KeyValue.pair(value.toLowerCase(), 9000));
return result;
}
);
如果我没理解错的话,您想根据状态存储中的数据发出多条记录。使用 Kafka Streams 2.2 之前的 transform()
,您可以通过在 Transformer
中多次调用 context.forward()
来实现。例如:
stream
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
context.forward(key, value);
context.forward(key + 1, value + 1;)
return null;
}
@Override
public void close() {
}
}, stateStoreName);
请注意,通过使用 context.forward()
,您没有编译时类型安全。如果转发类型不符合输出类型 KStream
(上例中的 <Integer, Integer>
)的记录,代码会编译但会在运行时抛出异常。
从 Kafka Streams 2.2 开始,您可以使用 flatTransform()
。使用 flatTransform()
,您可以 return 记录列表,而不是像上面的示例那样多次使用 context.forward()
和 return null
。以这种方式使用 flatTransform()
保证编译时类型安全。