编译错误 foreachBatch 不是 DataStreamWriter 的成员,即使它在 spark-shell 上有效

Compile error foreachBatch is not a member of DataStreamWriter even though on spark-shell it works

我正在尝试将 foreachBatch 与 spark 结构化流媒体一起使用。我在 spark-shell 控制台上尝试了代码,它没有任何问题,但是当我尝试编译代码时,我遇到了以下错误。

value foreachBatch is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] [error] possible cause: maybe a semicolon is missing before `value foreachBatch'? [error] .foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf

我的代码是这样的。

val query = finalStream
  .writeStream
  .foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
      .write
      .format("com.databricks.spark.redshift")
      .option("url", StreamingCfg.redshiftJdbcUrl)
      .option("dbtable", redshiftTableName)
      .option("aws_iam_role", StreamingCfg.redshiftARN)
      .option("tempdir", redshiftTempDir)
      .mode(SaveMode.Append)
      .save()

    batchDf
      .write
      .mode(SaveMode.Append)
      .partitionBy("date_key", "hour")
      .parquet(outputLocation);
  }
  .trigger(Trigger.ProcessingTime(aggregationTime.seconds))
  .option("checkpointLocation", checkPointingLocation)
  .start()

有人知道我在这里错过了什么吗?

更多关于我正在做的事情, 从 kafka 读取两个流 -> 对它们进行流-流连接 -> 同时将其写入 redshift 和 S3。 谢谢。

试试这样使用它:

finalStream
  .writeStream
  .foreachBatch( (batchDF: DataFrame, batchId: Long ) => {
      
  })

如果它在 spark-shell 中有效,您应该仔细检查工作(开发)环境中的依赖项。确保它能够加载所有 spark 依赖项并使用正确的版本。