在来自 co group 流的流之上应用键控状态

applying keyed state on top of stream from co group stream

(Test,2)
(Test,3) 
// word count stream from kafka topic 1
PCollection<KV<String,Long>> stream1 = ... 

// word count stream from kafka topic 2
PCollection<KV<String,Long>> stream2 = ... 

PCollection<KV<String,Long>> windowed1 = 
  stream1.apply(
    Window
      .<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
      .withAllowedLateness(Duration.millis(1000))
      .discardingFiredPanes());

PCollection<KV<String,Long>> windowed2 = 
  stream2.apply(
    Window
      .<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
      .withAllowedLateness(Duration.millis(1000))
      .discardingFiredPanes());

final TupleTag<Long> count1 = new TupleTag<Long>();
final TupleTag<Long> count2 = new TupleTag<Long>();

// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedStream =
    KeyedPCollectionTuple.of(count1, windowed1).and(count2, windowed2)
      .apply(CoGroupByKey.<String>create());

// applying state operation after coGroupKey fun 

PCollection<KV<String,Long>> finalCountStream =
  joinedStream.apply(ParDo.of(
    new DoFn<KV<String, CoGbkResult>, KV<String,Long>>() {

      @StateId(stateId)
      private final StateSpec<MapState<String, Long>> mapState =
          StateSpecs.map();

      @ProcessElement
      public void processElement(
        ProcessContext processContext,
        @StateId(stateId) MapState<String, Long> state) {

          KV<String, CoGbkResult> element = processContext.element();
          Iterable<Long> count1 = element.getValue().getAll(web);
          Iterable<Long> count2 = element.getValue().getAll(assist);
          Long sumAmount = 
              StreamSupport
                .stream(
                    Iterables.concat(count1, count2).spliterator(), false)
                .collect(Collectors.summingLong(n -> n));

          System.out.println(element.getKey()+"::"+sumAmount);
          //  processContext.output(element.getKey()+"::"+sumAmount);

          Long currCount = 
            state.get(element.getKey()).read() == null
              ? 0L
              : state.get(element.getKey()).read();
          Long newCount = currCount+sumAmount;
          state.put(element.getKey(),newCount);
          processContext.output(KV.of(element.getKey(),newCount));
        }
      }));

finalCountStream
    .apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {

      @StateId(myState)
      private final StateSpec<MapState<String, Long>> mapState =
        StateSpecs.map();

      @ProcessElement
      public void processElement(
        ProcessContext c,
        @StateId(myState) MapState<String, Long> state) {

          KV<String,Long> e = c.element();
          Long currCount = state.get(e.getKey()).read()==null
            ? 0L
            : state.get(e.getKey()).read();
          Long newCount = currCount+e.getValue();
          state.put(e.getKey(),newCount);
          c.output(e.getKey()+":"+newCount);
        }

      }))
    .apply(KafkaIO.<Void, String>write()
                  .withBootstrapServers("localhost:9092")
                  .withTopic("test")
                  .withValueSerializer(StringSerializer.class)
                  .values());
 PipelineOptions options = PipelineOptionsFactory.create();
    options.as(FlinkPipelineOptions.class)
            .setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);


    PCollection<KV<String,Long>> stream1 = new KafkaWordCount("localhost:9092","test1")
            .build(p);

    PCollection<KV<String,Long>> stream2 = new KafkaWordCount("localhost:9092","test2")
            .build(p);


    PCollectionList<KV<String, Long>> pcs = PCollectionList.of(stream1).and(stream2);
    PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());

    merged.apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {

        @StateId(myState)
        private final StateSpec<MapState<String, Long>> mapState = StateSpecs.map();

        @ProcessElement
        public void processElement(ProcessContext c, @StateId(myState) MapState<String, Long> state){

            KV<String,Long> e = c.element();
            System.out.println("Thread ID :"+ Thread.currentThread().getId());
            Long currCount = state.get(e.getKey()).read()==null? 0L:state.get(e.getKey()).read();
            Long newCount = currCount+e.getValue();
            state.put(e.getKey(),newCount);
            c.output(e.getKey()+":"+newCount);
        }

    })).apply(KafkaIO.<Void, String>write()
            .withBootstrapServers("localhost:9092")
            .withTopic("test")
            .withValueSerializer(StringSerializer.class)
            .values()
    );

    p.run().waitUntilFinish();

您已使用触发器 Repeatedly.forever(AfterPane.elementCountAtLeast(1))discardingFiredPanes() 设置了两个流。这将导致 CoGroupByKey 在每个输入元素后尽快输出,然后每次都重置其状态。所以它基本上直接通过每个输入是正常行为。

再解释一下:CoGroupByKey是这样执行的:

  • stream1stream2 中的所有元素都按照您指定的方式进行标记。因此 stream1 中的每个 (key, value1) 实际上都变成了 (key, (count1, value1))stream2 中的每个 (key, value2) 都变成`(key, (count2, value2))
  • 这些标记的集合被拼合在一起。所以现在有一个 collection 包含 (key, (count1, value1))(key, (count2, value2)).
  • 等元素
  • 合并后的 collection 经过正常的 GroupByKey。这是触发器发生的地方。因此,使用默认触发器,您会得到 (key, [(count1, value1), (count2, value2), ...]),其中包含一个键的所有值。但是使用触发器,您通常会得到单独的 (key, [(count1, value1)])(key, [(count2, value2)]),因为每个分组都会立即触发。
  • GroupByKey 的输出仅包含在 API 中,即 CoGbkResult。在许多跑步者中,这只是分组迭代的过滤视图。

当然,触发器是不确定的,运行器也可以有不同的 CoGroupByKey 实现。但是您看到的行为是预期的。您可能不想使用那样的触发器或丢弃模式,否则您需要在下游进行更多分组。

一般来说,在 Beam 支持撤销之前,与 CoGBK 进行连接需要一些下游工作。

或者,您可以使用 Flatten + Combine 方法,这应该会为您提供更简单的代码:

   PCollection<KV<String, Long>> pc1 = ...;
   PCollection<KV<String, Long>> pc2 = ...;
   PCollectionList<KV<String, Long>> pcs = PCollectionList.of(pc1).and(pc2);
   PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());
   merged.apply(windiw...).apply(Combine.perKey(Sum.ofLongs()))