KeyedProcessFunction 的 processFunction 中的上下文为空
Context in processFunction for KeyedProcessFunction is null
我正在尝试使用 KeyedProcessFunction,但 KeyedProcessFunction
中 processFunction
中的 ctx: Context
变量返回空值。请注意,我使用的是默认值 TimeCharacteristic
,即 ProcessingTime
(所以我什至没有设置它)。
我在 Whosebug 上找到了 ,但那个与 EventTime 有关,而不是 ProcessingTime。
按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example 的确切示例,我使用 Scala 2.11.12 和 Flink 1.10 创建了以下内容,但我仍然遇到相同的错误。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object example {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// the source data stream
val stream = env.socketTextStream("localhost", 9999).map(x => {
var splitCsv = x.stripLineEnd.split(",")
(splitCsv(0), splitCsv(1))
}
)
// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
result.print()
env.execute("Flink Streaming Demo STDOUT")
}
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(
value: (String, String),
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
}
这是错误:
Caused by: java.lang.NullPointerException at
scala.Predef$.Long2long(Predef.scala:363) at
com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:57)
at
com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:42)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at
java.lang.Thread.run(Thread.java:748)
知道我做错了什么吗?
提前致谢!
问题是您在第 57 行访问 Context
的 timestamp
字段。如果您使用 ProcessingTime
或在使用 EventTime
.
时未指定时间戳提取器,则此字段为 null
我正在尝试使用 KeyedProcessFunction,但 KeyedProcessFunction
中 processFunction
中的 ctx: Context
变量返回空值。请注意,我使用的是默认值 TimeCharacteristic
,即 ProcessingTime
(所以我什至没有设置它)。
我在 Whosebug 上找到了
按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example 的确切示例,我使用 Scala 2.11.12 和 Flink 1.10 创建了以下内容,但我仍然遇到相同的错误。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object example {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// the source data stream
val stream = env.socketTextStream("localhost", 9999).map(x => {
var splitCsv = x.stripLineEnd.split(",")
(splitCsv(0), splitCsv(1))
}
)
// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
result.print()
env.execute("Flink Streaming Demo STDOUT")
}
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(
value: (String, String),
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
}
这是错误:
Caused by: java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:57) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:42) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)
知道我做错了什么吗? 提前致谢!
问题是您在第 57 行访问 Context
的 timestamp
字段。如果您使用 ProcessingTime
或在使用 EventTime
.
null