如何解决 flink 服务器中的错误 "Rowtime timestamp is null"
How to resolve error "Rowtime timestamp is null" in flink server
我正在尝试 运行 flink 服务器中的一些代码但无法
下面是我的代码
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource());
keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
Table table = tableEnv
.sqlQuery(
"SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
+ "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");
DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
@Override
public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f0;
}
}).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f1;
}
});
JdbcSink sink = new JdbcSink();
userInfoDataStream.addSink(sink);
下面是我得到的错误
java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion1.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at flink.source.UserDataSource.run(UserDataSource.java:20)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
任何人都可以帮我解决这个问题吗?提前谢谢你
此问题的根本原因是当您在 keyedStream
上调用 assignTimestampsAndWatermarks
时,您没有对该调用的结果执行任何操作。如果您像这样重新编写代码,它将起作用:
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource())
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
在流上调用 assignTimestampsAndWatermarks
不会修改该流,而是 returns 具有时间戳和水印的新流。
这也可以像这样修复,这样可能更清楚发生了什么:
DataStream<UserInfo> streamWithTSandWMs = keyedStream
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");
我正在尝试 运行 flink 服务器中的一些代码但无法
下面是我的代码
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource());
keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
Table table = tableEnv
.sqlQuery(
"SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
+ "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");
DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
@Override
public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f0;
}
}).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f1;
}
});
JdbcSink sink = new JdbcSink();
userInfoDataStream.addSink(sink);
下面是我得到的错误
java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion1.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at flink.source.UserDataSource.run(UserDataSource.java:20)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
任何人都可以帮我解决这个问题吗?提前谢谢你
此问题的根本原因是当您在 keyedStream
上调用 assignTimestampsAndWatermarks
时,您没有对该调用的结果执行任何操作。如果您像这样重新编写代码,它将起作用:
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource())
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
在流上调用 assignTimestampsAndWatermarks
不会修改该流,而是 returns 具有时间戳和水印的新流。
这也可以像这样修复,这样可能更清楚发生了什么:
DataStream<UserInfo> streamWithTSandWMs = keyedStream
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");