为事件时间间隔连接找到多个行时间字段
Found more than one rowtime field for the event time interval join
我有两个流想要做间隔连接,事件类型是 case 类 定义如下,tradeDate 的类型是 java.sql.Timestamp
case class Stock(id: String, tradeDate: Timestamp, price: Double)
case class StockNameChanging(id: String, name: String, tradeDate: Timestamp)
当我运行下面的应用程序时,抛出异常如下,我不知道它在说什么,也不知道如何解决这个问题。
Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
org.apache.flink.table.api.TableException: Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
密码是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Stock stream
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000))
//StockNameChanging stream
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"tradeDate".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"tradeDate".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, s1.rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
Flink在做event-time处理的时候,每个event都需要一个event-time timestamp。使用Flink时SQL,如果要做event-time处理,每一行都必须有一个rowtime属性,而且只能有一个。但是,您的查询创建的 table 有两个事件时间属性,s1.rt1 和 s2.rt2。 Flink SQL 运行时正在抱怨,因为它无法为此结果 table.
中的行分配唯一的时间戳
由于您没有在此管道中进行任何进一步的基于事件时间的处理,因此您实际上不需要将这些列视为行时间列,因此您可以选择其中一个或两个作为 CAST时间戳。我相信这样的事情会奏效:
SELECT
s1.id, s2.name, s1.price, CAST(s1.rt1 AS TIMESTAMP) AS t1, CAST(s2.rt2 AS TIMESTAMP) AS t2
FROM
s1 join s2
...
我有两个流想要做间隔连接,事件类型是 case 类 定义如下,tradeDate 的类型是 java.sql.Timestamp
case class Stock(id: String, tradeDate: Timestamp, price: Double)
case class StockNameChanging(id: String, name: String, tradeDate: Timestamp)
当我运行下面的应用程序时,抛出异常如下,我不知道它在说什么,也不知道如何解决这个问题。
Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
org.apache.flink.table.api.TableException: Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
密码是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Stock stream
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000))
//StockNameChanging stream
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"tradeDate".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"tradeDate".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, s1.rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
Flink在做event-time处理的时候,每个event都需要一个event-time timestamp。使用Flink时SQL,如果要做event-time处理,每一行都必须有一个rowtime属性,而且只能有一个。但是,您的查询创建的 table 有两个事件时间属性,s1.rt1 和 s2.rt2。 Flink SQL 运行时正在抱怨,因为它无法为此结果 table.
中的行分配唯一的时间戳由于您没有在此管道中进行任何进一步的基于事件时间的处理,因此您实际上不需要将这些列视为行时间列,因此您可以选择其中一个或两个作为 CAST时间戳。我相信这样的事情会奏效:
SELECT
s1.id, s2.name, s1.price, CAST(s1.rt1 AS TIMESTAMP) AS t1, CAST(s2.rt2 AS TIMESTAMP) AS t2
FROM
s1 join s2
...