如何使用 writeStream 将 Spark 流传递给 kafka 主题

How to use writeStream to pass Spark stream to a kafka topic

我正在使用提供流的 Twitter 流功能。我需要使用 Spark writeStream 函数,例如:writeStream function link

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

'df' 需要是流媒体 Dataset/DataFrame。如果 df 是一个普通的 DataFrame,它会给出错误显示 'writeStream' can be called only on streaming Dataset/DataFrame;

我已经完成了: 1.从推特上获取流 2.过滤并映射它以获得每个twitt的标签(正面,负面,自然)

最后一步是对每个进行 groupBy 标记和计数,然后将其传递给 Kafka。

你们知道如何将 Dstream 转换为流媒体 Dataset/DataFrame 吗?

Edited: ForeachRDD function does change Dstream to normal DataFrame. But 'writeStream' can be called only on streaming Dataset/DataFrame. (writeStream link is provided above)

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;

how to transform a Dstream into a streaming Dataset/DataFrame?

DStream是一系列RDD的抽象。

流式 Dataset 是一系列 Dataset 中的一个 "abstraction"(我使用引号是因为流式和批处理 Dataset 之间的区别是 属性 isStreaming Dataset).

可以将 DStream 转换为流 Dataset 以保持 DStream 的行为。

不过我想你并不是真的想要它。

您只需要使用 DStream 获取推文并将它们保存到 Kafka 主题(您认为您需要结构化流)。我认为您只需要 Spark SQL(Structured Streaming 的底层引擎)。

伪代码如下(抱歉,我使用老式的 Spark Streaming 已经有一段时间了):

val spark: SparkSession = ...
val tweets = DStream...
tweets.foreachRDD { rdd =>
  import spark.implicits._
  rdd.toDF.write.format("kafka")...
}