将流数据帧写入kafka
Writing streaming dataframe to kafka
我正在通过 spark 结构化流从 kafka 主题读取日志行,分离日志行的字段,对字段执行一些操作并将其存储在数据框中,每个字段都有单独的列。我想把这个数据帧写到kafka
下面是我的示例数据框和将其写入 kafka 的写入流
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
以上代码给出了以下错误
Required attribute 'value' not found
我相信这是因为我在 key/value 中没有我的数据框 format.How 我可以以最有效的方式将我现有的数据框写入 kafka 吗?
写入 Kafka 的 Dataframe 应该在架构中包含以下列:
- key(可选)(类型:字符串或二进制)
- 值(必填)(类型:字符串或二进制)
- 主题(可选)(类型:字符串)
在您的情况下,没有 value
列并且抛出异常。
您必须修改它以至少添加值列,例如:
import org.apache.spark.sql.functions.{concat, lit}
dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))
我正在通过 spark 结构化流从 kafka 主题读取日志行,分离日志行的字段,对字段执行一些操作并将其存储在数据框中,每个字段都有单独的列。我想把这个数据帧写到kafka
下面是我的示例数据框和将其写入 kafka 的写入流
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
以上代码给出了以下错误
Required attribute 'value' not found
我相信这是因为我在 key/value 中没有我的数据框 format.How 我可以以最有效的方式将我现有的数据框写入 kafka 吗?
写入 Kafka 的 Dataframe 应该在架构中包含以下列:
- key(可选)(类型:字符串或二进制)
- 值(必填)(类型:字符串或二进制)
- 主题(可选)(类型:字符串)
在您的情况下,没有 value
列并且抛出异常。
您必须修改它以至少添加值列,例如:
import org.apache.spark.sql.functions.{concat, lit}
dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))