如何使用 Flink 按事件时间对流进行排序 SQL
How to sort a stream by event time using Flink SQL
我有一个乱序的 DataStream<Event>
我想对其进行排序,以便事件按事件时间戳排序。我已将我的用例简化为我的事件 class 只有一个字段——timestamp
字段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
我收到这个错误:
Exception in thread "main"
org.apache.flink.table.api.SqlParserException: SQL parse failed.
Encountered "timestamp FROM" at line 1, column 8.
好像我没有以正确的方式指定事件时间属性,但不清楚哪里出了问题。
问题原来是在我的事件 class 中使用 timestamp
作为字段名称。将其更改为 eventTime
足以让一切正常工作:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}
我有一个乱序的 DataStream<Event>
我想对其进行排序,以便事件按事件时间戳排序。我已将我的用例简化为我的事件 class 只有一个字段——timestamp
字段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
我收到这个错误:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.
好像我没有以正确的方式指定事件时间属性,但不清楚哪里出了问题。
问题原来是在我的事件 class 中使用 timestamp
作为字段名称。将其更改为 eventTime
足以让一切正常工作:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}