如何解决 flink 服务器中的错误 "Rowtime timestamp is null"

How to resolve error "Rowtime timestamp is null" in flink server

我正在尝试 运行 flink 服务器中的一些代码但无法

下面是我的代码

    DataStream<UserInfo> keyedStream = executionEnvironment
                .addSource(new UserDataSource());

        keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");

        Table table = tableEnv
                .sqlQuery(
                        "SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
                                + "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");

        DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
                .filter(new FilterFunction<Tuple2<Boolean, Row>>() {
                    @Override
                    public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
                        return booleanUserInfoTuple2.f0;
                    }
                }).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
                    @Override
                    public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
                        return booleanUserInfoTuple2.f1;
                    }
                });

        JdbcSink sink = new JdbcSink();
        userInfoDataStream.addSink(sink);

下面是我得到的错误

 java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
    at DataStreamSourceConversion1.processElement(Unknown Source)
    at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
    at flink.source.UserDataSource.run(UserDataSource.java:20)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)

任何人都可以帮我解决这个问题吗?提前谢谢你

此问题的根本原因是当您在 keyedStream 上调用 assignTimestampsAndWatermarks 时,您没有对该调用的结果执行任何操作。如果您像这样重新编写代码,它将起作用:

DataStream<UserInfo> keyedStream = executionEnvironment
    .addSource(new UserDataSource())
    .assignTimestampsAndWatermarks(new MessageWaterEmitter());

tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");

在流上调用 assignTimestampsAndWatermarks 不会修改该流,而是 returns 具有时间戳和水印的新流。

这也可以像这样修复,这样可能更清楚发生了什么:

DataStream<UserInfo> streamWithTSandWMs = keyedStream
    .assignTimestampsAndWatermarks(new MessageWaterEmitter());

tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");