加入两个流不起作用
Join of two streams not works
我正在尝试使用 Apache Flink streaming 加入两个流 API 但没有加入任何东西,在阅读文档后我不知道我做错了什么
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector).
window(GlobalWindows.create()).apply(joinFunction);
joined.print();
env.execute("Window");
关键功能就是myPojo.getFirst()
除非您指定自定义 Trigger
,否则 GlobalWindows
window 永远不会触发。在您的示例中,如果您使用类似 TumblingEventTimeWindows.of(Time.seconds(5))
的内容,您应该会看到结果。
我正在尝试使用 Apache Flink streaming 加入两个流 API 但没有加入任何东西,在阅读文档后我不知道我做错了什么
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector).
window(GlobalWindows.create()).apply(joinFunction);
joined.print();
env.execute("Window");
关键功能就是myPojo.getFirst()
除非您指定自定义 Trigger
,否则 GlobalWindows
window 永远不会触发。在您的示例中,如果您使用类似 TumblingEventTimeWindows.of(Time.seconds(5))
的内容,您应该会看到结果。