如何使用结构化流从 Spark 发布到 Kafka?
How to publish to Kafka from Spark using Structured streaming?
我正在编写一个 Spark 应用程序,它从 Kafka 主题读取消息,在数据库中查找记录,构建新消息并将它们发布到另一个 Kafka 主题。这是我的代码的样子 -
val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "input-kafka-topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[InputMessage] = parseMessages(r)
}
inputMessagesDataSet
.writeStream
.foreachBatch(processMessages _)
.trigger(trigger)
.start
.awaitTermination
def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
// fetch stuff from DB and build a DataSet[OutputMessage]
val outputMessagesDataSet: DataSet[OutputMessage] = ...
// now queue to another kafka topic
outputMessagesDataSet
.writeStream
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "output-kafka-topic")
.option("checkpointLocation", loc)
.start
.awaitTermination
}
但是我收到一条错误消息
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
在线的
outputMessagesDataSet.writeStream
这似乎是因为 outputMessagesDataSet
不是使用 readStream
创建的。
我没有在原来的 mapPartitions()
中构造 DataSet[OutputMessage]
的原因是因为获取数据库记录等所需的 类 不可序列化,所以它抛出 NotSerializableException
.
如何构造一个新的DataSet并排队到Kafka?
foreachBatch
接受静态数据集,所以你需要使用write
,而不是writeStream
或者,您可以 writeStream.format("kafka")
不使用 forEachBatch
我正在编写一个 Spark 应用程序,它从 Kafka 主题读取消息,在数据库中查找记录,构建新消息并将它们发布到另一个 Kafka 主题。这是我的代码的样子 -
val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "input-kafka-topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[InputMessage] = parseMessages(r)
}
inputMessagesDataSet
.writeStream
.foreachBatch(processMessages _)
.trigger(trigger)
.start
.awaitTermination
def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
// fetch stuff from DB and build a DataSet[OutputMessage]
val outputMessagesDataSet: DataSet[OutputMessage] = ...
// now queue to another kafka topic
outputMessagesDataSet
.writeStream
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "output-kafka-topic")
.option("checkpointLocation", loc)
.start
.awaitTermination
}
但是我收到一条错误消息
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
在线的
outputMessagesDataSet.writeStream
这似乎是因为 outputMessagesDataSet
不是使用 readStream
创建的。
我没有在原来的 mapPartitions()
中构造 DataSet[OutputMessage]
的原因是因为获取数据库记录等所需的 类 不可序列化,所以它抛出 NotSerializableException
.
如何构造一个新的DataSet并排队到Kafka?
foreachBatch
接受静态数据集,所以你需要使用write
,而不是writeStream
或者,您可以 writeStream.format("kafka")
不使用 forEachBatch