在来自 co group 流的流之上应用键控状态
applying keyed state on top of stream from co group stream
- 我有两个kafka源
- 我正在尝试执行世界计数并合并来自两个流的计数
- 我已经为两个数据流创建了 1 分钟的 window 并从 DoFn 应用 coGroupBykey ,我正在发射
<Key,Value> (word,count)
在此 coGroupByKey
函数之上,我正在应用有状态 ParDo
假设我在相同的 window 时间内从 stream 1
获得 (Test,2)
,从 stream 2
获得 (Test,3)
然后在 CogroupByKey
函数,我将合并为 (Test,5)
,但如果它们不在同一个 window 中,我将发出 (Test,2)
和 (Test,3)
现在我将应用状态来合并这些元素
所以最后我应该得到 (Test,5)
,但我没有得到预期的结果,stream 1
形式的所有元素都将进入一个分区并且
从 stream 2
到另一个分区的元素,这就是为什么我得到结果
(Test,2)
(Test,3)
// word count stream from kafka topic 1
PCollection<KV<String,Long>> stream1 = ...
// word count stream from kafka topic 2
PCollection<KV<String,Long>> stream2 = ...
PCollection<KV<String,Long>> windowed1 =
stream1.apply(
Window
.<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.millis(1000))
.discardingFiredPanes());
PCollection<KV<String,Long>> windowed2 =
stream2.apply(
Window
.<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.millis(1000))
.discardingFiredPanes());
final TupleTag<Long> count1 = new TupleTag<Long>();
final TupleTag<Long> count2 = new TupleTag<Long>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedStream =
KeyedPCollectionTuple.of(count1, windowed1).and(count2, windowed2)
.apply(CoGroupByKey.<String>create());
// applying state operation after coGroupKey fun
PCollection<KV<String,Long>> finalCountStream =
joinedStream.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String,Long>>() {
@StateId(stateId)
private final StateSpec<MapState<String, Long>> mapState =
StateSpecs.map();
@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId(stateId) MapState<String, Long> state) {
KV<String, CoGbkResult> element = processContext.element();
Iterable<Long> count1 = element.getValue().getAll(web);
Iterable<Long> count2 = element.getValue().getAll(assist);
Long sumAmount =
StreamSupport
.stream(
Iterables.concat(count1, count2).spliterator(), false)
.collect(Collectors.summingLong(n -> n));
System.out.println(element.getKey()+"::"+sumAmount);
// processContext.output(element.getKey()+"::"+sumAmount);
Long currCount =
state.get(element.getKey()).read() == null
? 0L
: state.get(element.getKey()).read();
Long newCount = currCount+sumAmount;
state.put(element.getKey(),newCount);
processContext.output(KV.of(element.getKey(),newCount));
}
}));
finalCountStream
.apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {
@StateId(myState)
private final StateSpec<MapState<String, Long>> mapState =
StateSpecs.map();
@ProcessElement
public void processElement(
ProcessContext c,
@StateId(myState) MapState<String, Long> state) {
KV<String,Long> e = c.element();
Long currCount = state.get(e.getKey()).read()==null
? 0L
: state.get(e.getKey()).read();
Long newCount = currCount+e.getValue();
state.put(e.getKey(),newCount);
c.output(e.getKey()+":"+newCount);
}
}))
.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withValueSerializer(StringSerializer.class)
.values());
PipelineOptions options = PipelineOptionsFactory.create();
options.as(FlinkPipelineOptions.class)
.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<KV<String,Long>> stream1 = new KafkaWordCount("localhost:9092","test1")
.build(p);
PCollection<KV<String,Long>> stream2 = new KafkaWordCount("localhost:9092","test2")
.build(p);
PCollectionList<KV<String, Long>> pcs = PCollectionList.of(stream1).and(stream2);
PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());
merged.apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {
@StateId(myState)
private final StateSpec<MapState<String, Long>> mapState = StateSpecs.map();
@ProcessElement
public void processElement(ProcessContext c, @StateId(myState) MapState<String, Long> state){
KV<String,Long> e = c.element();
System.out.println("Thread ID :"+ Thread.currentThread().getId());
Long currCount = state.get(e.getKey()).read()==null? 0L:state.get(e.getKey()).read();
Long newCount = currCount+e.getValue();
state.put(e.getKey(),newCount);
c.output(e.getKey()+":"+newCount);
}
})).apply(KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withValueSerializer(StringSerializer.class)
.values()
);
p.run().waitUntilFinish();
您已使用触发器 Repeatedly.forever(AfterPane.elementCountAtLeast(1))
和 discardingFiredPanes()
设置了两个流。这将导致 CoGroupByKey
在每个输入元素后尽快输出,然后每次都重置其状态。所以它基本上直接通过每个输入是正常行为。
再解释一下:CoGroupByKey
是这样执行的:
stream1
和 stream2
中的所有元素都按照您指定的方式进行标记。因此 stream1
中的每个 (key, value1)
实际上都变成了 (key, (count1, value1))
。 stream2
中的每个 (key, value2)
都变成`(key, (count2, value2))
- 这些标记的集合被拼合在一起。所以现在有一个 collection 包含
(key, (count1, value1))
和 (key, (count2, value2))
. 等元素
- 合并后的 collection 经过正常的
GroupByKey
。这是触发器发生的地方。因此,使用默认触发器,您会得到 (key, [(count1, value1), (count2, value2), ...])
,其中包含一个键的所有值。但是使用触发器,您通常会得到单独的 (key, [(count1, value1)])
和 (key, [(count2, value2)])
,因为每个分组都会立即触发。
GroupByKey
的输出仅包含在 API 中,即 CoGbkResult
。在许多跑步者中,这只是分组迭代的过滤视图。
当然,触发器是不确定的,运行器也可以有不同的 CoGroupByKey
实现。但是您看到的行为是预期的。您可能不想使用那样的触发器或丢弃模式,否则您需要在下游进行更多分组。
一般来说,在 Beam 支持撤销之前,与 CoGBK 进行连接需要一些下游工作。
或者,您可以使用 Flatten + Combine 方法,这应该会为您提供更简单的代码:
PCollection<KV<String, Long>> pc1 = ...;
PCollection<KV<String, Long>> pc2 = ...;
PCollectionList<KV<String, Long>> pcs = PCollectionList.of(pc1).and(pc2);
PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());
merged.apply(windiw...).apply(Combine.perKey(Sum.ofLongs()))
- 我有两个kafka源
- 我正在尝试执行世界计数并合并来自两个流的计数
- 我已经为两个数据流创建了 1 分钟的 window 并从 DoFn 应用 coGroupBykey ,我正在发射
<Key,Value> (word,count)
在此
coGroupByKey
函数之上,我正在应用有状态ParDo
假设我在相同的 window 时间内从
stream 1
获得(Test,2)
,从stream 2
获得(Test,3)
然后在CogroupByKey
函数,我将合并为(Test,5)
,但如果它们不在同一个 window 中,我将发出(Test,2)
和(Test,3)
现在我将应用状态来合并这些元素
所以最后我应该得到
(Test,5)
,但我没有得到预期的结果,stream 1
形式的所有元素都将进入一个分区并且 从stream 2
到另一个分区的元素,这就是为什么我得到结果
(Test,2)
(Test,3)
// word count stream from kafka topic 1
PCollection<KV<String,Long>> stream1 = ...
// word count stream from kafka topic 2
PCollection<KV<String,Long>> stream2 = ...
PCollection<KV<String,Long>> windowed1 =
stream1.apply(
Window
.<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.millis(1000))
.discardingFiredPanes());
PCollection<KV<String,Long>> windowed2 =
stream2.apply(
Window
.<KV<String,Long>>into(FixedWindows.of(Duration.millis(60000)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.millis(1000))
.discardingFiredPanes());
final TupleTag<Long> count1 = new TupleTag<Long>();
final TupleTag<Long> count2 = new TupleTag<Long>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedStream =
KeyedPCollectionTuple.of(count1, windowed1).and(count2, windowed2)
.apply(CoGroupByKey.<String>create());
// applying state operation after coGroupKey fun
PCollection<KV<String,Long>> finalCountStream =
joinedStream.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String,Long>>() {
@StateId(stateId)
private final StateSpec<MapState<String, Long>> mapState =
StateSpecs.map();
@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId(stateId) MapState<String, Long> state) {
KV<String, CoGbkResult> element = processContext.element();
Iterable<Long> count1 = element.getValue().getAll(web);
Iterable<Long> count2 = element.getValue().getAll(assist);
Long sumAmount =
StreamSupport
.stream(
Iterables.concat(count1, count2).spliterator(), false)
.collect(Collectors.summingLong(n -> n));
System.out.println(element.getKey()+"::"+sumAmount);
// processContext.output(element.getKey()+"::"+sumAmount);
Long currCount =
state.get(element.getKey()).read() == null
? 0L
: state.get(element.getKey()).read();
Long newCount = currCount+sumAmount;
state.put(element.getKey(),newCount);
processContext.output(KV.of(element.getKey(),newCount));
}
}));
finalCountStream
.apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {
@StateId(myState)
private final StateSpec<MapState<String, Long>> mapState =
StateSpecs.map();
@ProcessElement
public void processElement(
ProcessContext c,
@StateId(myState) MapState<String, Long> state) {
KV<String,Long> e = c.element();
Long currCount = state.get(e.getKey()).read()==null
? 0L
: state.get(e.getKey()).read();
Long newCount = currCount+e.getValue();
state.put(e.getKey(),newCount);
c.output(e.getKey()+":"+newCount);
}
}))
.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withValueSerializer(StringSerializer.class)
.values());
PipelineOptions options = PipelineOptionsFactory.create();
options.as(FlinkPipelineOptions.class)
.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<KV<String,Long>> stream1 = new KafkaWordCount("localhost:9092","test1")
.build(p);
PCollection<KV<String,Long>> stream2 = new KafkaWordCount("localhost:9092","test2")
.build(p);
PCollectionList<KV<String, Long>> pcs = PCollectionList.of(stream1).and(stream2);
PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());
merged.apply("finalState", ParDo.of(new DoFn<KV<String,Long>, String>() {
@StateId(myState)
private final StateSpec<MapState<String, Long>> mapState = StateSpecs.map();
@ProcessElement
public void processElement(ProcessContext c, @StateId(myState) MapState<String, Long> state){
KV<String,Long> e = c.element();
System.out.println("Thread ID :"+ Thread.currentThread().getId());
Long currCount = state.get(e.getKey()).read()==null? 0L:state.get(e.getKey()).read();
Long newCount = currCount+e.getValue();
state.put(e.getKey(),newCount);
c.output(e.getKey()+":"+newCount);
}
})).apply(KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withValueSerializer(StringSerializer.class)
.values()
);
p.run().waitUntilFinish();
您已使用触发器 Repeatedly.forever(AfterPane.elementCountAtLeast(1))
和 discardingFiredPanes()
设置了两个流。这将导致 CoGroupByKey
在每个输入元素后尽快输出,然后每次都重置其状态。所以它基本上直接通过每个输入是正常行为。
再解释一下:CoGroupByKey
是这样执行的:
stream1
和stream2
中的所有元素都按照您指定的方式进行标记。因此stream1
中的每个(key, value1)
实际上都变成了(key, (count1, value1))
。stream2
中的每个(key, value2)
都变成`(key, (count2, value2))- 这些标记的集合被拼合在一起。所以现在有一个 collection 包含
(key, (count1, value1))
和(key, (count2, value2))
. 等元素
- 合并后的 collection 经过正常的
GroupByKey
。这是触发器发生的地方。因此,使用默认触发器,您会得到(key, [(count1, value1), (count2, value2), ...])
,其中包含一个键的所有值。但是使用触发器,您通常会得到单独的(key, [(count1, value1)])
和(key, [(count2, value2)])
,因为每个分组都会立即触发。 GroupByKey
的输出仅包含在 API 中,即CoGbkResult
。在许多跑步者中,这只是分组迭代的过滤视图。
当然,触发器是不确定的,运行器也可以有不同的 CoGroupByKey
实现。但是您看到的行为是预期的。您可能不想使用那样的触发器或丢弃模式,否则您需要在下游进行更多分组。
一般来说,在 Beam 支持撤销之前,与 CoGBK 进行连接需要一些下游工作。
或者,您可以使用 Flatten + Combine 方法,这应该会为您提供更简单的代码:
PCollection<KV<String, Long>> pc1 = ...;
PCollection<KV<String, Long>> pc2 = ...;
PCollectionList<KV<String, Long>> pcs = PCollectionList.of(pc1).and(pc2);
PCollection<KV<String, Long>> merged = pcs.apply(Flatten.<KV<String, Long>>pCollections());
merged.apply(windiw...).apply(Combine.perKey(Sum.ofLongs()))