如何从数组列中获取数组元素
How to get array element from array column
我正在使用 Flink 1.12,我有以下简单代码来演示数组类型列的用法。
我想获取收藏夹数组列中的第二个元素,但是当我 运行 运行 应用程序时,
以下异常抛出:
Type is not supported: ANY
org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:576)
at org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
at org.apache.flink.table.planner.operations.PlannerQueryOperation.lambda$new[=10=](PlannerQueryOperation.java:55)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Collections.tryAdvance(Collections.java:4717)
申请代码:
test("test java po schema") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
val ds: DataStream[(String, Timestamp, Double, Seq[String])] = env.fromElements(("123", new Timestamp(new Date().getTime), 12.4, Seq("x", "y", "z")))
val tenv = StreamTableEnvironment.create(env)
val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"favorites")
table.printSchema()
tenv.createTemporaryView("xxx", table)
//I want to query the second element in the favorites array column
tenv.sqlQuery("select favorites[1] from xxx").toAppendStream[Row].print()
env.execute()
}
如果该列实际上是一个数组,这应该有效——它不能是一个 Java 列表或一个 Scala Seq。
我正在使用 Flink 1.12,我有以下简单代码来演示数组类型列的用法。
我想获取收藏夹数组列中的第二个元素,但是当我 运行 运行 应用程序时, 以下异常抛出:
Type is not supported: ANY
org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:576)
at org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
at org.apache.flink.table.planner.operations.PlannerQueryOperation.lambda$new[=10=](PlannerQueryOperation.java:55)
at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
at java.util.Collections.tryAdvance(Collections.java:4717)
申请代码:
test("test java po schema") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
val ds: DataStream[(String, Timestamp, Double, Seq[String])] = env.fromElements(("123", new Timestamp(new Date().getTime), 12.4, Seq("x", "y", "z")))
val tenv = StreamTableEnvironment.create(env)
val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"favorites")
table.printSchema()
tenv.createTemporaryView("xxx", table)
//I want to query the second element in the favorites array column
tenv.sqlQuery("select favorites[1] from xxx").toAppendStream[Row].print()
env.execute()
}
如果该列实际上是一个数组,这应该有效——它不能是一个 Java 列表或一个 Scala Seq。