如何使用结构化的火花流将镶木地板批量发送到卡夫卡?
How to send parquet to kafka in batches using strcutured spark streaming?
我正在读取 parquet 文件并将其转换为 JSON 格式,然后发送到 kafka。问题是,它读取了整个镶木地板,所以一次性发送给 kafka,但我想逐行或批量发送 json 数据:
object WriteParquet2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("yarn")
.appName("Write Parquet to Kafka")
.getOrCreate()
import spark.implicits._
val ds: DataFrame = spark.readStream
.schema(parquet-schema)
.parquet(path-to-parquet-file)
val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )
.filter($"key" isNotNull)
val ddf = df
.writeStream
.format("kafka")
.option("topic", topics)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/test")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
ddf.awaitTermination()
}
}
可以这样做吗?
我终于想出如何解决我的问题了,只需添加一个option
并为maxFilesPerTrigger
设置一个合适的数字:
val df: DataFrame = spark
.readStream
.option("maxFilesPerTrigger", 1)
.schema(parquetSchema)
.parquet(parqurtUri)
注意:maxFilesPerTrigger
必须设置为1,这样每个parquet文件都被读取。
我正在读取 parquet 文件并将其转换为 JSON 格式,然后发送到 kafka。问题是,它读取了整个镶木地板,所以一次性发送给 kafka,但我想逐行或批量发送 json 数据:
object WriteParquet2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("yarn")
.appName("Write Parquet to Kafka")
.getOrCreate()
import spark.implicits._
val ds: DataFrame = spark.readStream
.schema(parquet-schema)
.parquet(path-to-parquet-file)
val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )
.filter($"key" isNotNull)
val ddf = df
.writeStream
.format("kafka")
.option("topic", topics)
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/test")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
ddf.awaitTermination()
}
}
可以这样做吗?
我终于想出如何解决我的问题了,只需添加一个option
并为maxFilesPerTrigger
设置一个合适的数字:
val df: DataFrame = spark
.readStream
.option("maxFilesPerTrigger", 1)
.schema(parquetSchema)
.parquet(parqurtUri)
注意:maxFilesPerTrigger
必须设置为1,这样每个parquet文件都被读取。