Flink Table API & SQL 和映射类型 (Scala)
Flink Table API & SQL and map types (Scala)
我在 streaming[=] 中使用 Flink 的 Table API and/or Flink 的 SQL 支持(Flink 1.3.1,Scala 2.11) 46=] 环境。我从 DataStream[Person]
开始,Person
是一个案例 class,看起来像:
Person(name: String, age: Int, attributes: Map[String, String])
一切都按预期工作,直到我开始将 attributes
带入画面。
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
... 导致:
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:531)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:530)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.nordstrom.mdt.Job$.main(Job.scala:112)
at com.nordstrom.mdt.Job.main(Job.scala)
注意:无论特定映射键是否存在,都会发生此错误。另请注意,如果我 而不是 指定地图键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。
这个 PR 似乎 说有一条前进的道路:https://github.com/apache/flink/pull/3767. Looking specifically at the test case,它表明数据集可以使用类型信息。 None 的相关方法 fromDataStream
和 registerDataStream
提供了一种提供类型信息的方法。
这可能吗?也就是说,Flink SQL on Streams 可以支持maps吗?
正在澄清编辑...
当 省略映射键(GROUP BY ... attributes
而不是 attributes['foo']
)时,我得到以下错误。这表明运行时确实知道这些是字符串。
This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.
目前FlinkSQL仅支持Javajava.util.Map
。 Scala 映射被视为具有 Flink GenericTypeInfo
/SQL ANY
数据类型的黑盒。因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用 ['key']
运算符访问。
因此,要么使用 Java 映射,要么在 UDF 中自己实现访问操作。
我为你的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360
我在 streaming[=] 中使用 Flink 的 Table API and/or Flink 的 SQL 支持(Flink 1.3.1,Scala 2.11) 46=] 环境。我从 DataStream[Person]
开始,Person
是一个案例 class,看起来像:
Person(name: String, age: Int, attributes: Map[String, String])
一切都按预期工作,直到我开始将 attributes
带入画面。
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
... 导致:
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)
注意:无论特定映射键是否存在,都会发生此错误。另请注意,如果我 而不是 指定地图键,我会得到一个不同的错误,这是有道理的;这种情况在这里没有发挥作用。
这个 PR 似乎 说有一条前进的道路:https://github.com/apache/flink/pull/3767. Looking specifically at the test case,它表明数据集可以使用类型信息。 None 的相关方法 fromDataStream
和 registerDataStream
提供了一种提供类型信息的方法。
这可能吗?也就是说,Flink SQL on Streams 可以支持maps吗?
正在澄清编辑...
当 省略映射键(GROUP BY ... attributes
而不是 attributes['foo']
)时,我得到以下错误。这表明运行时确实知道这些是字符串。
This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.
目前FlinkSQL仅支持Javajava.util.Map
。 Scala 映射被视为具有 Flink GenericTypeInfo
/SQL ANY
数据类型的黑盒。因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用 ['key']
运算符访问。
因此,要么使用 Java 映射,要么在 UDF 中自己实现访问操作。
我为你的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360