编译错误 foreachBatch 不是 DataStreamWriter 的成员,即使它在 spark-shell 上有效
Compile error foreachBatch is not a member of DataStreamWriter even though on spark-shell it works
我正在尝试将 foreachBatch
与 spark 结构化流媒体一起使用。我在 spark-shell
控制台上尝试了代码,它没有任何问题,但是当我尝试编译代码时,我遇到了以下错误。
value foreachBatch is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
[error] possible cause: maybe a semicolon is missing before `value foreachBatch'?
[error] .foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
我的代码是这样的。
val query = finalStream
.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
.write
.format("com.databricks.spark.redshift")
.option("url", StreamingCfg.redshiftJdbcUrl)
.option("dbtable", redshiftTableName)
.option("aws_iam_role", StreamingCfg.redshiftARN)
.option("tempdir", redshiftTempDir)
.mode(SaveMode.Append)
.save()
batchDf
.write
.mode(SaveMode.Append)
.partitionBy("date_key", "hour")
.parquet(outputLocation);
}
.trigger(Trigger.ProcessingTime(aggregationTime.seconds))
.option("checkpointLocation", checkPointingLocation)
.start()
有人知道我在这里错过了什么吗?
更多关于我正在做的事情,
从 kafka 读取两个流 -> 对它们进行流-流连接 -> 同时将其写入 redshift 和 S3。
谢谢。
试试这样使用它:
finalStream
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long ) => {
})
如果它在 spark-shell 中有效,您应该仔细检查工作(开发)环境中的依赖项。确保它能够加载所有 spark 依赖项并使用正确的版本。
我正在尝试将 foreachBatch
与 spark 结构化流媒体一起使用。我在 spark-shell
控制台上尝试了代码,它没有任何问题,但是当我尝试编译代码时,我遇到了以下错误。
value foreachBatch is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] [error] possible cause: maybe a semicolon is missing before `value foreachBatch'? [error] .foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
我的代码是这样的。
val query = finalStream
.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
.write
.format("com.databricks.spark.redshift")
.option("url", StreamingCfg.redshiftJdbcUrl)
.option("dbtable", redshiftTableName)
.option("aws_iam_role", StreamingCfg.redshiftARN)
.option("tempdir", redshiftTempDir)
.mode(SaveMode.Append)
.save()
batchDf
.write
.mode(SaveMode.Append)
.partitionBy("date_key", "hour")
.parquet(outputLocation);
}
.trigger(Trigger.ProcessingTime(aggregationTime.seconds))
.option("checkpointLocation", checkPointingLocation)
.start()
有人知道我在这里错过了什么吗?
更多关于我正在做的事情, 从 kafka 读取两个流 -> 对它们进行流-流连接 -> 同时将其写入 redshift 和 S3。 谢谢。
试试这样使用它:
finalStream
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long ) => {
})
如果它在 spark-shell 中有效,您应该仔细检查工作(开发)环境中的依赖项。确保它能够加载所有 spark 依赖项并使用正确的版本。