Flink:应用在 KeyedStream 上的 RichMapFunction 中的 Access Key 来处理由 Option 键控的流
Flink: Access Key in RichMapFunction applied on KeyedStream to handle stream keyed by Option
当我将 RichMapFunction
应用于键控流时,我想将 None
作为关键案例处理。
例如我有这样一个案例class:
case class Foo(a: Option[String], b: Int, acc: Option[Int] = None)
acc
是我想用 map
.
计算的字段
我想在流上应用状态映射,所以我有一个 RichMapFunction
(例如它是一个累加器):
class Accumulator extends RichMapFunction[Foo, Foo] {
private var sum: ValueState[Int] = _
override def map(input: Foo): Foo = {
val newAcc = Option(sum.value()) match {
case None => input.b
case Some(x) => x + input.b
}
sum.update(newAcc)
Foo(input.a, input.b, Some(newAcc))
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("accumulator", createTypeInformation[Int])
)
}
}
然后,我的管道执行:
object ExampleAccumulator extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(Foo(Some("a"), 1, None), Foo(Some("a"), 2, None), Foo(None, 10, None), Foo(None, 6, None))
.keyBy(_.a)
.map(new Accumulator())
.print()
env.execute("ExampleAccumulator")
}
输出是:
Foo(Some(a),1,Some(1))
Foo(Some(a),2,Some(3))
Foo(None,10,Some(10))
Foo(None,6,Some(16))
但是我想在 acc
中得到 None
当键是 None
.
是否可以在RichMapFunction中获取key?
暂不支持。
key可以通过KeyContextclass中的getCurrentKey()方法获取,RichMapFunction中没有暴露。但是,Flink 内部提供了 KeyedProcessFunction,可以 return key 入参数 Context。我相信这就是你想要的。
您可以通过 Foo
的值访问键,通过 KeySelector
API,Scala 解决方法:
val selector = scalaKeyedStream
.javaStream
.asInstanceOf[org.apache.flink.streaming.api.datastream.KeyedStream]
.getKeySelector
scalaKeyedStream.map(in => selector.getKey(in))
您需要将 Scala Stream 转换为 Java,因为 Scala API、details.
中没有 getKeySelector
方法
当我将 RichMapFunction
应用于键控流时,我想将 None
作为关键案例处理。
例如我有这样一个案例class:
case class Foo(a: Option[String], b: Int, acc: Option[Int] = None)
acc
是我想用 map
.
我想在流上应用状态映射,所以我有一个 RichMapFunction
(例如它是一个累加器):
class Accumulator extends RichMapFunction[Foo, Foo] {
private var sum: ValueState[Int] = _
override def map(input: Foo): Foo = {
val newAcc = Option(sum.value()) match {
case None => input.b
case Some(x) => x + input.b
}
sum.update(newAcc)
Foo(input.a, input.b, Some(newAcc))
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("accumulator", createTypeInformation[Int])
)
}
}
然后,我的管道执行:
object ExampleAccumulator extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements(Foo(Some("a"), 1, None), Foo(Some("a"), 2, None), Foo(None, 10, None), Foo(None, 6, None))
.keyBy(_.a)
.map(new Accumulator())
.print()
env.execute("ExampleAccumulator")
}
输出是:
Foo(Some(a),1,Some(1))
Foo(Some(a),2,Some(3))
Foo(None,10,Some(10))
Foo(None,6,Some(16))
但是我想在 acc
中得到 None
当键是 None
.
是否可以在RichMapFunction中获取key?
暂不支持。 key可以通过KeyContextclass中的getCurrentKey()方法获取,RichMapFunction中没有暴露。但是,Flink 内部提供了 KeyedProcessFunction,可以 return key 入参数 Context。我相信这就是你想要的。
您可以通过 Foo
的值访问键,通过 KeySelector
API,Scala 解决方法:
val selector = scalaKeyedStream
.javaStream
.asInstanceOf[org.apache.flink.streaming.api.datastream.KeyedStream]
.getKeySelector
scalaKeyedStream.map(in => selector.getKey(in))
您需要将 Scala Stream 转换为 Java,因为 Scala API、details.
中没有getKeySelector
方法