Flink datastream keyby 使用复合键
Flink datastream keyby using composite key
我的问题与 非常相似,只是那个问题是针对 Java 的,我需要 Scala 中的答案。我在 IntelliJ 中复制粘贴了提供的解决方案,它自动将复制粘贴的代码片段转换为 Scala,然后我对其进行了编辑以适合我的代码。我仍然遇到编译错误(甚至在编译 IntelliJ 能够检测到代码问题之前)。基本上,提供给 keyBy 的参数(return keySelector 的 getKey 函数的值)与 keyBy 函数的任何重载版本所期望的参数不匹配。
查找了许多 KeySelector 的 scala 代码示例,其中 return 是一个复合键,但没有找到。
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new
KeySelector[AAPerMinData, Tuple2[String, String]]() {
@throws[Exception]
override def getKey(value: AAPerMinData): Tuple2[String, String] =
Tuple2.of(value.field1, value.field2)
})
编译代码时出现以下错误:
Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence :org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields:
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {
我不确定导致此错误的语法中缺少什么。任何帮助是极大的赞赏。解决此问题后的下一步是根据复合键执行基于 TumblingWindow 的摘要。
更新 1 (12/29/2018):
更改代码以使用简单的字符串类型字段作为使用 KeySelector 格式的键(我知道这可以以更简单的方式完成,我这样做只是为了让基本的 KeySelector 工作)。
import org.apache.flink.api.java.functions.KeySelector
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
@throws[Exception]
override def getKey(value: AAPerMinData): String = value.set1.sEntId
})
这是我得到的错误的屏幕截图(即 IntelliJ 在鼠标悬停时显示此错误)。
更新 2 (12/29/2018)
这有效(对于单个钥匙盒)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String]
(_.set1.sEntId)
这不起作用(对于复合键大小写)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)
更新 3 (12/29/2018)
尝试了以下,无法让它工作。查看错误屏幕截图。
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))
更新 4 (12/30/2018)
现在已解决,请参阅已接受的答案。对于任何可能感兴趣的人,这是最终的工作代码,包括使用复合键进行聚合:
// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
out: Collector[AAPerMinDataAggr]) =>
out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
key._1, key._2, // also needed individual pieces
window,
events,
stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()
首先,虽然没有必要,但请继续使用 Scala 元组。它会让事情变得更容易,除非你出于某种原因必须与 Java 元组进行互操作。
然后,不要使用 org.apache.flink.api.java.functions.KeySelector。您想要使用来自 org.apache.flink.streaming.api.scala.DataStream:
的这个 keyBy
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
换句话说,只需传递一个将您的流元素转换为键值的函数(通常,Flink 的 scala API 尽量做到地道)。所以像这样的东西应该可以完成工作:
aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)
更新:
对于组合键大小写,使用
aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
我的问题与
查找了许多 KeySelector 的 scala 代码示例,其中 return 是一个复合键,但没有找到。
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new
KeySelector[AAPerMinData, Tuple2[String, String]]() {
@throws[Exception]
override def getKey(value: AAPerMinData): Tuple2[String, String] =
Tuple2.of(value.field1, value.field2)
})
编译代码时出现以下错误:
Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence :org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields:
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {
我不确定导致此错误的语法中缺少什么。任何帮助是极大的赞赏。解决此问题后的下一步是根据复合键执行基于 TumblingWindow 的摘要。
更新 1 (12/29/2018): 更改代码以使用简单的字符串类型字段作为使用 KeySelector 格式的键(我知道这可以以更简单的方式完成,我这样做只是为了让基本的 KeySelector 工作)。
import org.apache.flink.api.java.functions.KeySelector
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
@throws[Exception]
override def getKey(value: AAPerMinData): String = value.set1.sEntId
})
这是我得到的错误的屏幕截图(即 IntelliJ 在鼠标悬停时显示此错误)。
更新 2 (12/29/2018)
这有效(对于单个钥匙盒)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String]
(_.set1.sEntId)
这不起作用(对于复合键大小写)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)
更新 3 (12/29/2018) 尝试了以下,无法让它工作。查看错误屏幕截图。
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))
更新 4 (12/30/2018) 现在已解决,请参阅已接受的答案。对于任何可能感兴趣的人,这是最终的工作代码,包括使用复合键进行聚合:
// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
out: Collector[AAPerMinDataAggr]) =>
out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
key._1, key._2, // also needed individual pieces
window,
events,
stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()
首先,虽然没有必要,但请继续使用 Scala 元组。它会让事情变得更容易,除非你出于某种原因必须与 Java 元组进行互操作。
然后,不要使用 org.apache.flink.api.java.functions.KeySelector。您想要使用来自 org.apache.flink.streaming.api.scala.DataStream:
的这个 keyBy/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
换句话说,只需传递一个将您的流元素转换为键值的函数(通常,Flink 的 scala API 尽量做到地道)。所以像这样的东西应该可以完成工作:
aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)
更新:
对于组合键大小写,使用
aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))