在结构化流中找不到连续触发器

Continuous trigger not found in Structured Streaming

运行时:Spark 2.3.0、Scala 2.11(Databricks 4.1 ML 测试版)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

//kafka settings and df definition goes here

val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start

未找到抛出错误:值连续

其他无效的尝试:

.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above

.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)

参考文献:

(Databricks 博客) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

(星火指南) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing

(Scaladoc)https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package

Spark 2.3.0 不支持连续流下的 parquet,您必须使用基于 Kafka、控制台或内存的流。

引用 continuous processing mode in structured streaming 博客 post:

You can set the optional Continuous Trigger in queries that satisfy the following conditions: Read from supported sources like Kafka and write to supported sinks like Kafka, memory, console.

尝试使用 trigger(Trigger.ProcessingTime("1 second"))

这会起作用,因为我遇到了同样的问题,并且使用此方法已解决。

如下spark代码,只有实现了StreamWriteSupport接口的sink才能使用ContinuousTrigger.

    (sink, trigger) match {
      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
        new StreamingQueryWrapper(new ContinuousExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          v2Sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))
      case _ =>
        new StreamingQueryWrapper(new MicroBatchExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))

而且只有三个接收器实现了这个接口,ConsoleSinkProviderKafkaSourceProvider, MemorySinkV2.

在 Spark 3.0.1 中,连续处理模式是实验性的,支持依赖于 SourceSink 的特殊查询类型。 =13=]

根据 Continuous Processing 上的文档,支持以下查询,但似乎不支持编写 parquet:

As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.

Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select, map, flatMap, mapPartitions, etc.) and selections (where, filter, etc.).
   All SQL functions are supported except aggregation functions (since aggregations are not yet supported), current_timestamp() and current_date() (deterministic computations using time is challenging).
Sources:
   Kafka source: All options are supported.
   Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
Sinks:
   Kafka sink: All options are supported.
   Memory sink: Good for debugging.
   Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.