在任何 table 中找不到 Flink 1.4 列 'rowtime'
Flink 1.4 Column 'rowtime' not found in any table
我正在关注 configure a TableSource with a rowtime attribute 的文档。
我注册timestamp
字段如下
KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
.forTopic("alerting")
// set Kafka consumer properties
.withKafkaProperties(getKafkaProperties())
// set Table schema
.withSchema(TableSchema.builder()
.field("tenant", Types.STRING())
.field("message", Types.STRING())
.field("frequency", Types.LONG())
.field("timestamp", Types.SQL_TIMESTAMP()).build())
.failOnMissingField(true)
.withRowtimeAttribute(
// "timestamp" is rowtime attribute
"timestamp",
// value of "timestamp" is extracted from existing field with same name
new ExistingField("timestamp"),
// values of "timestamp" are at most out-of-order by 30 seconds
new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
.build();
//register the alerting topic as kafka
tEnv.registerTableSource("kafka", source);
Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
tEnv.toAppendStream(results, Row.class).print();
并得到以下错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
您 table kafka
中的字段称为 timestamp
而不是 rowtime
。因此,您应该使用按名称调用属性 timestamp
而不是 rowtime
.
请注意 TIMESTAMP
是 SQL 中的关键字,因此您应该重命名 timestamp
属性或使用反引号 (`) 转义属性名称:
tEnv.sqlQuery(
"SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
顺便说一句。 BoundedOutOfOrderTimestamps
一天其实也不少。这可能会导致显着的处理延迟和状态大小,因为查询将在开始发出结果和丢弃状态之前收集一天的数据。
我正在关注 configure a TableSource with a rowtime attribute 的文档。
我注册timestamp
字段如下
KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
.forTopic("alerting")
// set Kafka consumer properties
.withKafkaProperties(getKafkaProperties())
// set Table schema
.withSchema(TableSchema.builder()
.field("tenant", Types.STRING())
.field("message", Types.STRING())
.field("frequency", Types.LONG())
.field("timestamp", Types.SQL_TIMESTAMP()).build())
.failOnMissingField(true)
.withRowtimeAttribute(
// "timestamp" is rowtime attribute
"timestamp",
// value of "timestamp" is extracted from existing field with same name
new ExistingField("timestamp"),
// values of "timestamp" are at most out-of-order by 30 seconds
new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
.build();
//register the alerting topic as kafka
tEnv.registerTableSource("kafka", source);
Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
tEnv.toAppendStream(results, Row.class).print();
并得到以下错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
您 table kafka
中的字段称为 timestamp
而不是 rowtime
。因此,您应该使用按名称调用属性 timestamp
而不是 rowtime
.
请注意 TIMESTAMP
是 SQL 中的关键字,因此您应该重命名 timestamp
属性或使用反引号 (`) 转义属性名称:
tEnv.sqlQuery(
"SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
顺便说一句。 BoundedOutOfOrderTimestamps
一天其实也不少。这可能会导致显着的处理延迟和状态大小,因为查询将在开始发出结果和丢弃状态之前收集一天的数据。