无法使用spark将结果写入kafka主题
Unable to write results to kafka topic using spark
我的 end goal
是 write out
和 read
聚合数据到新 Kafka topic
的批处理中。我关注了 official documentation
和其他几个帖子,但没有成功。我会先读取主题,进行聚合,将结果保存在另一个 Kafka 主题中,然后再次读取主题并在控制台打印。下面是我的代码:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
object SparkKafkaTopic3 {
def main(ar: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo5")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"), $"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemo6")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo6").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
当我运行这个的时候,在控制台最初有一个exception
java.lang.IllegalStateException: Cannot find earliest offsets of Set(songDemo4-0). Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
此外,如果我尝试 运行 此代码 without
写入主题部分并再次阅读,一切正常。
我尝试使用 consumer command
从 shell 中读取主题,但没有显示任何记录。我在这里遗漏了什么吗?
下面是我的数据集:
>sid,Believer
>sid,Thunder
>sid,Stairway to heaven
>sid,Heaven
>sid,Heaven
>sid,thunder
>sid,Believer
当我 运行 @Srinivas 的代码并阅读新主题后,我得到如下数据:
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Stairway to heaven, 1]
[[2020-06-07 18:40:40, 2020-06-07 18:41:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Thunder, 1]
在这里您可以看到 Believer 的 window 框架是相同的,但条目仍然是分开的。为什么会这样?它应该是计数为 2 的单个条目,因为 window 帧是相同的
检查下面的代码。
添加这个 windowedCount.select(struct("*").cast("string").as("value"))
在你写任何东西到 kafka 之前你必须转换所有类型的列 string
该列的别名是 value
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"),$"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemoA")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemoA").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
已更新 - 排序输出
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
.orderBy($"window.start".asc) // Add this line if you want order.
仅当您使用输出模式 complete
时,排序或排序结果才有效,对于任何其他值,它将引发错误。
例如检查下面的代码。
val outputTopic = windowedCount
.writeStream
.format("console")
.option("truncate","false")
.outputMode("complete")
.start()
我的 end goal
是 write out
和 read
聚合数据到新 Kafka topic
的批处理中。我关注了 official documentation
和其他几个帖子,但没有成功。我会先读取主题,进行聚合,将结果保存在另一个 Kafka 主题中,然后再次读取主题并在控制台打印。下面是我的代码:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
object SparkKafkaTopic3 {
def main(ar: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo5")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"), $"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemo6")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo6").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
当我运行这个的时候,在控制台最初有一个exception
java.lang.IllegalStateException: Cannot find earliest offsets of Set(songDemo4-0). Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
此外,如果我尝试 运行 此代码 without
写入主题部分并再次阅读,一切正常。
我尝试使用 consumer command
从 shell 中读取主题,但没有显示任何记录。我在这里遗漏了什么吗?
下面是我的数据集:
>sid,Believer
>sid,Thunder
>sid,Stairway to heaven
>sid,Heaven
>sid,Heaven
>sid,thunder
>sid,Believer
当我 运行 @Srinivas 的代码并阅读新主题后,我得到如下数据:
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Stairway to heaven, 1]
[[2020-06-07 18:40:40, 2020-06-07 18:41:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Thunder, 1]
在这里您可以看到 Believer 的 window 框架是相同的,但条目仍然是分开的。为什么会这样?它应该是计数为 2 的单个条目,因为 window 帧是相同的
检查下面的代码。
添加这个 windowedCount.select(struct("*").cast("string").as("value"))
在你写任何东西到 kafka 之前你必须转换所有类型的列 string
该列的别名是 value
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"),$"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemoA")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemoA").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
已更新 - 排序输出
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
.orderBy($"window.start".asc) // Add this line if you want order.
仅当您使用输出模式 complete
时,排序或排序结果才有效,对于任何其他值,它将引发错误。
例如检查下面的代码。
val outputTopic = windowedCount
.writeStream
.format("console")
.option("truncate","false")
.outputMode("complete")
.start()