为什么Dataset.mapPartitions后没有输出(下游流处理停止数据)?

Why no output after Dataset.mapPartitions (stopping data in downstream stream processing)?

我正在使用 Spark Structured Streaming,我遇到了 mapPartitions 的问题。

如果我们在 mapPartitions 中注释 foreach 操作,它工作得很好。

spark ui 正在显示作业完成

stream
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .map { data => MetricDataFilter.filter_metric_data(data._1, data._2) }
  .mapPartitions { partition =>
    val lagged = partition.filter { x => x.metric_type == "n" }
    lagged.foreach { x =>
      val diff = Instant.now().toEpochMilli - x.timestamp.getTime
      println(x.toString)
      println(diff)
    }
    partition
  }
  .filter($"metric_type" === "c")
  .withWatermark("timestamp", "5 minutes")
  .groupBy(
    window($"timestamp", "30 seconds", "30 seconds"),
    $"metric_name", $"timestamp")
  .sum("metric_value").as("sum_metric_value")
  .writeStream
  .queryName("countMetricQuery")
  .outputMode(OutputMode.Update)
  .format("console")
  .start

删除 mapPartitions 中的 foreach 效果很好。

lagged.foreach {
              x =>
                val diff = Instant.now().toEpochMilli - x.timestamp.getTime
                diff > 0 match {
                  case false =>
                    println(x.toString)
                  case true => println(x.toString)
                }
                println(diff)

            }

这是一个很难发现的问题,这取决于 Iterator 在 Scala 中的工作方式。

以下是 mapPartitions 的签名,表示您与 Iterator[T] 一起工作。

mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]

Returns a new Dataset that contains the result of applying func to each partition.

scala.Iterator 是单通道数据结构,因此一旦所有元素都被消耗,它们就消失了。

看看你自己:

scala> val it = Seq(1,2,3).toIterator
it: Iterator[Int] = <iterator>

scala> it.foreach(println)
1
2
3

scala> it.foreach(println)

如您所见,第二个 foreach 中没有打印任何内容,因为没有剩余元素可供使用。

换句话说,您的流式查询在 mapPartitions 之后没有数据要处理,因此没有输出。