Flink ProcessWindowFunction 编译报错
Flink ProcessWindowFunction Compilation Error
import com.typesafe.config.ConfigFactory
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.project.async.core.job.FlinkKafkaConnector
import org.project.functions.TestWindowFunction
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction // USED SCALA API
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.evictor(CountEvictor.of(2))
.process(new TestWindowFunction)
====TestWindowFunction.scala=====
class TestWindowFunction
extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
override def process(
key: String,
window: Context,
input: Iterable[(String, Int)],
out: Collector[(String, String, Int)]): Unit = {
input.foreach(e => out.collect((e._1, e._1, e._2)))
}
}
=================ERROR===============
Task.scala:47: error: type mismatch;
[ERROR] found : org.project.functions.TestWindowFunction
[ERROR] required: org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction[(String, Int),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[ERROR] .process(new TestWindowFunction)
[ERROR] ^
[ERROR] one error found
我正在尝试使用 window 功能来收集某个特定时间间隔内的事件,但我遇到了这个编译错误,我无法解决这个问题,我也参考了下面这个问题我还是没能解决。
Link: Apache Flink: ProcessWindowFunction implementation
这段代码可能有什么问题?
Flink 版本:1.10.0
Scala 需要这些导入
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
你不应该使用
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
当遇到这种情况时,我的解决方案是删除所有导入,然后让 IntelliJ 帮助我(小心地)将它们添加回去。
有一个类似的问题 - 在我的例子中,它是由扩展错误的 class.
的 window 函数实现引起的
// didn't work:
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
更新导入解决了问题。
// worked:
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import com.typesafe.config.ConfigFactory
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.project.async.core.job.FlinkKafkaConnector
import org.project.functions.TestWindowFunction
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction // USED SCALA API
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
val window1 = source
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.evictor(CountEvictor.of(2))
.process(new TestWindowFunction)
====TestWindowFunction.scala=====
class TestWindowFunction
extends ProcessWindowFunction[(String, Int), (String, String, Int), String, TimeWindow] {
override def process(
key: String,
window: Context,
input: Iterable[(String, Int)],
out: Collector[(String, String, Int)]): Unit = {
input.foreach(e => out.collect((e._1, e._1, e._2)))
}
}
=================ERROR===============
Task.scala:47: error: type mismatch;
[ERROR] found : org.project.functions.TestWindowFunction
[ERROR] required: org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction[(String, Int),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
[ERROR] .process(new TestWindowFunction)
[ERROR] ^
[ERROR] one error found
我正在尝试使用 window 功能来收集某个特定时间间隔内的事件,但我遇到了这个编译错误,我无法解决这个问题,我也参考了下面这个问题我还是没能解决。
Link: Apache Flink: ProcessWindowFunction implementation
这段代码可能有什么问题?
Flink 版本:
Scala 需要这些导入
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
你不应该使用
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
当遇到这种情况时,我的解决方案是删除所有导入,然后让 IntelliJ 帮助我(小心地)将它们添加回去。
有一个类似的问题 - 在我的例子中,它是由扩展错误的 class.
的 window 函数实现引起的// didn't work:
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
更新导入解决了问题。
// worked:
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction