查找未使用 coGroupFunction 分组的事件流
Find stream of events that are not grouped using coGroupFunction
当使用 CoGroupFunction
时,我们如何找到与其他事件不匹配的事件流?
让我们假设人们正在通过 phone 电话进行交流。在 Tuple2<String, Integer>
中,f0
是人名,f1
是 phone 他们正在呼叫或接听的号码。
我们已经使用 coGroup
将他们配对,但是我们缺少接到来自世界之外的人的电话的人。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
new Tuple2<String, Integer>("alice->", 12), // alice dials 12
new Tuple2<String, Integer>("bob->", 13), // bob dials 13
new Tuple2<String, Integer>("charlie->", 19))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));
DataStream<Tuple2<String, Integer>> callees = env.fromElements(
new Tuple2<String, Integer>("->carl", 12), // carl received call
new Tuple2<String, Integer>("->ted", 13),
new Tuple2<String, Integer>("->chris", 7))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;
DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
.where(evt -> evt.f1).equalTo(evt -> evt.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new IntEqualCoGroupFunc());
groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)
//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris
env.execute();
老实说,最简单的解决方案似乎是更改 IntEqualCoGroupFunc
,以便 String
变成 returns (Boolean, String)
。
这是因为 coGroup
还处理那些没有匹配键的元素,这些元素在函数 coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)
中将有一个 Iterable
为空,即对于您的情况,它将收到 ("->chris", 7)
作为 first
和空 Iterable
作为 second
.
签名的更改可以让您轻松发出没有匹配键的结果,并在后期处理阶段简单地将它们拆分成单独的流。
// Implementation of IntEqualCoGroupFunc
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> outbound, Iterable<Tuple2<String, Integer>> inbound,
Collector<Tuple1<String>> out) throws Exception {
for (Tuple2<String, Integer> outboundObj : outbound) {
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of(outboundObj.f0 + "-" + inboundObj.f0)); //matching pair
return;
}
out.collect(Tuple1.of(outboundObj.f0 + "->someone")); //inbound is empty
return;
}
// outbound is empty
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of("someone->-" + inboundObj.f0));
return;
}
//inbound also empty
out.collect(Tuple1.of("someone->-->someone"));
}
输出如下:
2> (someone->-->chris)
2> (charlie->->someone)
1> (alice->-->carl)
1> (bob->-->ted)
当使用 CoGroupFunction
时,我们如何找到与其他事件不匹配的事件流?
让我们假设人们正在通过 phone 电话进行交流。在 Tuple2<String, Integer>
中,f0
是人名,f1
是 phone 他们正在呼叫或接听的号码。
我们已经使用 coGroup
将他们配对,但是我们缺少接到来自世界之外的人的电话的人。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
new Tuple2<String, Integer>("alice->", 12), // alice dials 12
new Tuple2<String, Integer>("bob->", 13), // bob dials 13
new Tuple2<String, Integer>("charlie->", 19))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));
DataStream<Tuple2<String, Integer>> callees = env.fromElements(
new Tuple2<String, Integer>("->carl", 12), // carl received call
new Tuple2<String, Integer>("->ted", 13),
new Tuple2<String, Integer>("->chris", 7))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;
DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
.where(evt -> evt.f1).equalTo(evt -> evt.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new IntEqualCoGroupFunc());
groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)
//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris
env.execute();
老实说,最简单的解决方案似乎是更改 IntEqualCoGroupFunc
,以便 String
变成 returns (Boolean, String)
。
这是因为 coGroup
还处理那些没有匹配键的元素,这些元素在函数 coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)
中将有一个 Iterable
为空,即对于您的情况,它将收到 ("->chris", 7)
作为 first
和空 Iterable
作为 second
.
签名的更改可以让您轻松发出没有匹配键的结果,并在后期处理阶段简单地将它们拆分成单独的流。
// Implementation of IntEqualCoGroupFunc
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> outbound, Iterable<Tuple2<String, Integer>> inbound,
Collector<Tuple1<String>> out) throws Exception {
for (Tuple2<String, Integer> outboundObj : outbound) {
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of(outboundObj.f0 + "-" + inboundObj.f0)); //matching pair
return;
}
out.collect(Tuple1.of(outboundObj.f0 + "->someone")); //inbound is empty
return;
}
// outbound is empty
for (Tuple2<String, Integer> inboundObj : inbound) {
out.collect(Tuple1.of("someone->-" + inboundObj.f0));
return;
}
//inbound also empty
out.collect(Tuple1.of("someone->-->someone"));
}
输出如下:
2> (someone->-->chris)
2> (charlie->->someone)
1> (alice->-->carl)
1> (bob->-->ted)