无法为不存在的运算符 Timestamps/Watermarks 设置 uid

Can't set uid for operator Timestamps/Watermarks that doesn't exist

我正在做一个项目,并打开了 Flink 选项 disableAutoGeneratedUIDs 以确保为所有内容提供正确的 uid。但是我在尝试 运行 作业时遇到错误:

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator Timestamps/Watermarks

现在,我 运行 进入我在没有设置 uid 时(如预期)所做的操作符的错误,但这个操作符不是我创建或命名的。所以我假设它是用于创建源函数正在使用的 WatermarkStrategy 的代码,但我不能给 WatermarkStrategy 一个 uid,所以这是一个死胡同。

我在网上查了一下,发现一年前有一个错误说分区需要一个 uid 而它不应该有,我想知道这是否是一个类似的情况 (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-assign-a-UID-to-a-KeyedStream-td32052.html)。

接下来我可以尝试什么?

刚刚弄明白了。我认为这只是一个疏忽。

所以代码最初只是:

getSource().name(NAME).assignTimestampsAndWatermarks(getWatermarkStrategy())
            .process(new CountOperation())
            .name(COUNT_OPERATION_NAME);

在我的第一次尝试中,我尝试在此处添加 uid:

getSource().name(NAME).assignTimestampsAndWatermarks(getWatermarkStrategy())
            .uid("WatermarkUid")
            .process(new CountOperation())
            .name(COUNT_OPERATION_NAME);

认为这将处理整个“片段”。然后,当我第一次收到错误提示“此计数操作名称没有 uid”时,我将 uid 移到了 .process:

之后
getSource().name(NAME).assignTimestampsAndWatermarks(getWatermarkStrategy())
            .process(new CountOperation())
            .uid("WatermarkUid")
            .name(COUNT_OPERATION_NAME);

然后就是我看到错误说运算符 Timestamps/Watermarks 没有 Uid 的时候。然后我开始寻找在代码中命名的运算符。但是在我模糊的“星期五大脑”逻辑中,我不知何故没有弄清楚第一个 uid 解决了错误,因为这是 assignTimestampsAndWatermarks 的默认名称(感谢@hourcos 提供的提示在仪表板 ui。看到操作员块中的名称使一切都点击)。那么解决它的方法就是像这样添加一个 uid:

getSource().name(NAME).assignTimestampsAndWatermarks(getWatermarkStrategy())
            .uid("WatermarkUid")
            .process(new CountOperation())
            .uid("ToCountUid")
            .name(COUNT_OPERATION_NAME);