如何使用结构化的火花流将镶木地板批量发送到卡夫卡?

How to send parquet to kafka in batches using strcutured spark streaming?

我正在读取 parquet 文件并将其转换为 JSON 格式,然后发送到 kafka。问题是,它读取了整个镶木地板,所以一次性发送给 kafka,但我想逐行或批量发送 json 数据:

object WriteParquet2Kafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("yarn")
      .appName("Write Parquet to Kafka")
      .getOrCreate()

    import spark.implicits._
    val ds: DataFrame = spark.readStream
      .schema(parquet-schema)
      .parquet(path-to-parquet-file)


    val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )
      .filter($"key" isNotNull)

    val ddf = df
      .writeStream
      .format("kafka")
      .option("topic", topics)
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("checkpointLocation", "/tmp/test")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    ddf.awaitTermination()
  }
}

可以这样做吗?

我终于想出如何解决我的问题了,只需添加一个option并为maxFilesPerTrigger设置一个合适的数字:

    val df: DataFrame = spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .schema(parquetSchema)
      .parquet(parqurtUri)

注意:maxFilesPerTrigger必须设置为1,这样每个parquet文件都被读取。