KeyedProcessFunction 的 processFunction 中的上下文为空

Context in processFunction for KeyedProcessFunction is null

我正在尝试使用 KeyedProcessFunction,但 KeyedProcessFunctionprocessFunction 中的 ctx: Context 变量返回空值。请注意,我使用的是默认值 TimeCharacteristic,即 ProcessingTime(所以我什至没有设置它)。

我在 Whosebug 上找到了 ,但那个与 EventTime 有关,而不是 ProcessingTime。

按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example 的确切示例,我使用 Scala 2.11.12 和 Flink 1.10 创建了以下内容,但我仍然遇到相同的错误。

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object example {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // the source data stream
    val stream = env.socketTextStream("localhost", 9999).map(x => {
      var splitCsv = x.stripLineEnd.split(",")
      (splitCsv(0), splitCsv(1))
    }
    )

    // apply the process function onto a keyed stream
    val result: DataStream[Tuple2[String, Long]] = stream
      .keyBy(0)
      .process(new CountWithTimeoutFunction())

    result.print()

    env.execute("Flink Streaming Demo STDOUT")

  }

  /**
   * The data type stored in the state
   */
  case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

  /**
   * The implementation of the ProcessFunction that maintains the count and timeouts
   */
  class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {

    /** The state that is maintained by this process function */
    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
      .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


    override def processElement(
                                 value: (String, String),
                                 ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
                                 out: Collector[(String, Long)]): Unit = {

      // initialize or retrieve/update the state
      val current: CountWithTimestamp = state.value match {
        case null =>
          CountWithTimestamp(value._1, 1, ctx.timestamp)
        case CountWithTimestamp(key, count, lastModified) =>
          CountWithTimestamp(key, count + 1, ctx.timestamp)
      }

      // write the state back
      state.update(current)

      // schedule the next timer 60 seconds from the current event time
      ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
    }

    override def onTimer(
                          timestamp: Long,
                          ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
                          out: Collector[(String, Long)]): Unit = {

      state.value match {
        case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
          out.collect((key, count))
        case _ =>
      }
    }
  }
}

这是错误:

Caused by: java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:57) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:42) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)

知道我做错了什么吗? 提前致谢!

问题是您在第 57 行访问 Contexttimestamp 字段。如果您使用 ProcessingTime 或在使用 EventTime.

时未指定时间戳提取器,则此字段为 null