JOIN single table flink tableapi 两列

JOIN single table flink tableapi by two columns

我有一个 table 数据,我需要通过两个字段进行连接。

我写了一个请求,但是没有用

SELECT * 
FROM Data t1 
JOIN Data t2 ON t1.s = t2.o

密码是

val csvTableSource = CsvTableSource
  .builder
  .path("src/main/resources/data.dat")
  .field("s", Types.STRING)
  .field("p", Types.STRING)
  .field("o", Types.STRING)
  .field("TIMESTAMP", Types.STRING)
  .fieldDelimiter(",")
  .ignoreFirstLine
  .ignoreParseErrors
  .commentPrefix("%")
  .build()
tableEnv.registerTableSource("Data", csvTableSource)

val query = "SELECT * FROM Data t1 JOIN Data t2 ON t1.s = t2.o"
val table = tableEnv.sqlQuery(query)

我得到以下异常

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[=([=13=], )], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

我猜,您正在尝试 运行 在流式环境中进行此查询。 Flink 1.5.0 添加了流表上的非窗口连接。

所以您正在尝试使用 Flink 1.4.2 尚不支持的功能。

您可以切换到批处理环境,考虑到您正在读取 CSV 文件,或者升级到 Flink 1.5.0。