Flink:如何将已弃用的折叠转换为聚合?
Flink: How to convert the deprecated fold to aggregrate?
我正在关注Flink的快速入门示例:Monitoring the Wikipedia Edit Stream.
例子在Java,我是用Scala实现的,如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
但是Flink中的fold
函数已经弃用,推荐使用aggregate
函数。
但我没有找到有关如何将已弃用的 fold
转换为 aggregrate
的示例或教程。
知道怎么做吗?可能不仅仅是通过应用 aggregrate
.
更新
我有另一个实现如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
我也想知道如何使用自定义实现AggregateFunction
。
更新
我遵循了这个文档:AggregateFunction,但有以下问题:
在版本 1.3 的接口 AggregateFunction
的源代码中,您会看到 add
确实 returns void
:
void add(IN value, ACC accumulator);
但是对于版本 1.4 AggregateFunction
,正在返回:
ACC add(IN value, ACC accumulator);
我该如何处理?
我使用的Flink版本是1.3.2
,这个版本的文档没有AggregateFunction
,但是artifactory中还没有1.4版本。
您会找到 AggregateFunction
in the Flink 1.4 docs 的一些文档,包括示例。
1.3.2 中包含的版本仅限于与可变累加器类型一起使用,其中添加操作会修改累加器。这已 fixed for Flink 1.4,但尚未发布。
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
override def createAccumulator() = ("", 0)
override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)
override def getResult(accumulator: (String, Int)) = accumulator
override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}
object WikipediaAnalysis extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())
val result: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.aggregate(new SumAggregate)
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
result.print()
result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
see.execute("Wikipedia User Edit Volume")
}
我正在关注Flink的快速入门示例:Monitoring the Wikipedia Edit Stream.
例子在Java,我是用Scala实现的,如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
但是Flink中的fold
函数已经弃用,推荐使用aggregate
函数。
但我没有找到有关如何将已弃用的 fold
转换为 aggregrate
的示例或教程。
知道怎么做吗?可能不仅仅是通过应用 aggregrate
.
更新
我有另一个实现如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
我也想知道如何使用自定义实现AggregateFunction
。
更新
我遵循了这个文档:AggregateFunction,但有以下问题:
在版本 1.3 的接口 AggregateFunction
的源代码中,您会看到 add
确实 returns void
:
void add(IN value, ACC accumulator);
但是对于版本 1.4 AggregateFunction
,正在返回:
ACC add(IN value, ACC accumulator);
我该如何处理?
我使用的Flink版本是1.3.2
,这个版本的文档没有AggregateFunction
,但是artifactory中还没有1.4版本。
您会找到 AggregateFunction
in the Flink 1.4 docs 的一些文档,包括示例。
1.3.2 中包含的版本仅限于与可变累加器类型一起使用,其中添加操作会修改累加器。这已 fixed for Flink 1.4,但尚未发布。
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
override def createAccumulator() = ("", 0)
override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)
override def getResult(accumulator: (String, Int)) = accumulator
override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}
object WikipediaAnalysis extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())
val result: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.aggregate(new SumAggregate)
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
result.print()
result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
see.execute("Wikipedia User Edit Volume")
}