输出数据集中的 Spark 结构化流为空

Spark Structured Stream null in output dataset

i 运行 一个 scala 代码,它聚合数据并将输出打印到控制台。不幸的是,我在组操作后得到了一个空值。 当前输出:

|ID |日期 |计数 |
|null|null | 35471|

我意识到,瓶颈是重点,当我对数据进行分组时 - 当我尝试使用数字以外的列时,输出 returns 空值。欢迎任何建议 - 我花了很多时间寻找解决方案。

我的代码:

// create schema
val sensorsSchema = new StructType()
  .add("SensorId", IntegerType)
  .add("Timestamp", TimestampType)
  .add("Value", DoubleType)
  .add("State", StringType)

// read streaming data from csv...

// aggregate streaming data
val streamAgg = streamIn
  .withColumn("Date", to_date(unix_timestamp($"Timestamp", "dd/MM/yyyy").cast(TimestampType)))
  .groupBy("SensorId", "Date")
  .count()

// write streaming data...

我更改代码 - 现在完美运行:

/****************************************
* STREAMING APP
* 1.0 beta
*****************************************
* read data from csv (local)
* and save as parquet (local)
****************************************/

package tk.streaming

import org.apache.spark.SparkConf
import org.apache.spark.sql._
// import org.apache.spark.sql.functions._

case class SensorsSchema(SensorId: Int, Timestamp: String, Value: Double, State: String, OperatorId: Int)


object Runner {

  def main(args: Array[String]): Unit = {

    // Configuration parameters (to create spark session and contexts)
    val appName = "StreamingApp" // app name
    val master = "local[*]" // master configuration
    val dataDir = "/home/usr_spark/Projects/SparkStreaming/data"
    val refreshInterval = 30 // seconds


    // initialize context
    val conf = new SparkConf().setMaster(master).setAppName(appName)
    val spark = SparkSession.builder.config(conf).getOrCreate()


    import spark.implicits._

    // TODO change file source to Kafka (must)

    // read streaming data
    val sensorsSchema = Encoders.product[SensorsSchema].schema
    val streamIn = spark.readStream
      .format("csv")
      .schema(sensorsSchema)
      .load(dataDir + "/input")
      .select("SensorId", "Timestamp", "State", "Value") // remove "OperatorId" column


    // TODO save result in S3 (nice to have)

    // write streaming data
    import org.apache.spark.sql.streaming.Trigger
    val streamOut = streamIn.writeStream
      .queryName("streamingOutput")
      .format("parquet")
      .option("checkpointLocation", dataDir + "/output/checkpoint")
      .option("path", dataDir + "/output")
      .start()

    streamOut.awaitTermination() // start streaming data

  }
}