使用 Kafka Stream 中的 State-store(RocksDB) 将一条记录转换为多条记录

Transform one record into more than one record using State-store(RocksDB) in Kafka Stream

我想使用 state-store(RocksDB) 将一条记录转换为多条记录。我知道有一种方法,例如 stream.transform(final TransformerSupplier> transformerSupplier,final String... stateStoreNames) 但是如何 return 多个 KeyValue 对,以便我以后可以使用分支发布到受人尊敬的话题?

有一种方法可以向下游转发数据,但我如何才能再次使用该数据?

Kafka 版本 - 1.1.0

如果要将记录转换为多个键值对,可以使用flatMap操作。您可以修改键和值并生成零个、一个或多个记录。

KStream<String, Integer> transformed = stream.flatMap(
     // Here, we generate two output records for each input record.
     // We also change the key and value types.
     // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
    (key, value) -> {
      List<KeyValue<String, Integer>> result = new LinkedList<>();
      result.add(KeyValue.pair(value.toUpperCase(), 1000));
      result.add(KeyValue.pair(value.toLowerCase(), 9000));
      return result;
    }
  );

如果我没理解错的话,您想根据状态存储中的数据发出多条记录。使用 Kafka Streams 2.2 之前的 transform(),您可以通过在 Transformer 中多次调用 context.forward() 来实现。例如:

stream
   .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
       private ProcessorContext context;

       @Override
       public void init(final ProcessorContext context) {
           this.context = context;
       }

       @Override
       public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
           context.forward(key, value);
           context.forward(key + 1, value + 1;)
           return null;
       }

       @Override
       public void close() {
       }
   }, stateStoreName);

请注意,通过使用 context.forward(),您没有编译时类型安全。如果转发类型不符合输出类型 KStream(上例中的 <Integer, Integer>)的记录,代码会编译但会在运行时抛出异常。

从 Kafka Streams 2.2 开始,您可以使用 flatTransform()。使用 flatTransform(),您可以 return 记录列表,而不是像上面的示例那样多次使用 context.forward() 和 return null。以这种方式使用 flatTransform() 保证编译时类型安全。