如何从数组列中获取数组元素

How to get array element from array column

我正在使用 Flink 1.12,我有以下简单代码来演示数组类型列的用法。

我想获取收藏夹数组列中的第二个元素,但是当我 运行 运行 应用程序时, 以下异常抛出:

Type is not supported: ANY
org.apache.flink.table.api.TableException: Type is not supported: ANY
    at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:576)
    at org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType(FlinkTypeFactory.scala)
    at org.apache.flink.table.planner.operations.PlannerQueryOperation.lambda$new[=10=](PlannerQueryOperation.java:55)
    at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
    at java.util.Collections.tryAdvance(Collections.java:4717)

申请代码:

  test("test java po schema") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)
    val ds: DataStream[(String, Timestamp, Double, Seq[String])] = env.fromElements(("123", new Timestamp(new Date().getTime), 12.4, Seq("x", "y", "z")))
    val tenv = StreamTableEnvironment.create(env)
    val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"favorites")

    table.printSchema()

    tenv.createTemporaryView("xxx", table)

    //I want to query the second element in the favorites array column
    tenv.sqlQuery("select favorites[1] from xxx").toAppendStream[Row].print()


    env.execute()

  }

如果该列实际上是一个数组,这应该有效——它不能是一个 Java 列表或一个 Scala Seq。