Flink Table API & SQL 和映射类型 (Scala)

Flink Table API & SQL and map types (Scala)

我在 streaming[=] 中使用 Flink 的 Table API and/or Flink 的 SQL 支持(Flink 1.3.1,Scala 2.11) 46=] 环境。我从 DataStream[Person] 开始,Person 是一个案例 class,看起来像:

Person(name: String, age: Int, attributes: Map[String, String])

一切都按预期工作,直到我开始将 attributes 带入画面。

例如:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

... 导致:

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)

注意:无论特定映射键是否存在,都会发生此错误。另请注意,如果我 而不是 指定地图键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。

这个 PR 似乎 说有一条前进的道路:https://github.com/apache/flink/pull/3767. Looking specifically at the test case,它表明数据集可以使用类型信息。 None 的相关方法 fromDataStreamregisterDataStream 提供了一种提供类型信息的方法。

这可能吗?也就是说,Flink SQL on Streams 可以支持maps吗?

正在澄清编辑... 省略映射键(GROUP BY ... attributes 而不是 attributes['foo'])时,我得到以下错误。这表明运行时确实知道这些是字符串。

This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.

目前FlinkSQL仅支持Javajava.util.Map。 Scala 映射被视为具有 Flink GenericTypeInfo/SQL ANY 数据类型的黑盒。因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用 ['key'] 运算符访问。

因此,要么使用 Java 映射,要么在 UDF 中自己实现访问操作。

我为你的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360