Spark Structured Streaming 无法查看记录详细信息
Spark Structured Streaming not able to see the record details
我正在尝试处理来自读取流的记录并尝试打印该行。
如何在我的驱动程序日志或执行程序日志中看不到任何打印的语句。
可能有什么问题?
- 对于每条记录或批次(理想情况下),我想打印消息
- 对于每个批次,我想执行一个过程。
val kafka = spark.readStream
.format("kafka")
.option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", topic) // comma separated list of topics
.option("startingOffsets", "earliest")
.option("checkpointLocation", CHECKPOINT_LOCATION)
.option("failOnDataLoss", "false")
.option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
.load()
import spark.implicits._
println("JSON output to write into sink")
val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
//.select(from_json($"json", schema) as "data")
//.select("data.*")
//.select(get_json_object(($"value").cast("string"), "$").alias("body"))
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
consoleOutput.awaitTermination()
}
我复制了你的代码,运行没有 runProcess
函数调用就没问题了。
如果您打算做两件不同的事情,我建议在从 Kafka 主题中选择相关字段后进行两个单独的查询:
val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
1。对于每条记录或批次(理想情况下),我想打印消息
val query1 = kafkaSelection
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("checkpointLocation", CHECKPOINT_LOCATION1)
.start()
2。对于每个批次,我想执行一个过程。
val query2 = kafkaSelection
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION2)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
另请注意,我已分别为每个查询设置检查点位置,这将确保对 Kafka 偏移量的跟踪一致。确保每个查询有两个不同的检查点位置。您可以 运行 并行查询。
在等待它们终止之前定义这两个查询很重要:
query1.awaitTermination()
query2.awaitTermination()
使用 Spark 2.4.5 测试:
我正在尝试处理来自读取流的记录并尝试打印该行。 如何在我的驱动程序日志或执行程序日志中看不到任何打印的语句。 可能有什么问题?
- 对于每条记录或批次(理想情况下),我想打印消息
- 对于每个批次,我想执行一个过程。
val kafka = spark.readStream
.format("kafka")
.option("maxOffsetsPerTrigger", MAX_OFFSETS_PER_TRIGGER)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("subscribe", topic) // comma separated list of topics
.option("startingOffsets", "earliest")
.option("checkpointLocation", CHECKPOINT_LOCATION)
.option("failOnDataLoss", "false")
.option("minPartitions", sys.env.getOrElse("MIN_PARTITIONS", "64").toInt)
.load()
import spark.implicits._
println("JSON output to write into sink")
val consoleOutput = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
//.select(from_json($"json", schema) as "data")
//.select("data.*")
//.select(get_json_object(($"value").cast("string"), "$").alias("body"))
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
consoleOutput.awaitTermination()
}
我复制了你的代码,运行没有 runProcess
函数调用就没问题了。
如果您打算做两件不同的事情,我建议在从 Kafka 主题中选择相关字段后进行两个单独的查询:
val kafkaSelection = kafka.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
1。对于每条记录或批次(理想情况下),我想打印消息
val query1 = kafkaSelection
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("checkpointLocation", CHECKPOINT_LOCATION1)
.start()
2。对于每个批次,我想执行一个过程。
val query2 = kafkaSelection
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(row: Row): Unit = {
logger.info(
s"Record received in data frame is -> " + row.mkString )
runProcess() // Want to run some process every microbatch
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION2)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
另请注意,我已分别为每个查询设置检查点位置,这将确保对 Kafka 偏移量的跟踪一致。确保每个查询有两个不同的检查点位置。您可以 运行 并行查询。
在等待它们终止之前定义这两个查询很重要:
query1.awaitTermination()
query2.awaitTermination()
使用 Spark 2.4.5 测试: