在 Spark Structured Streaming 中逐行拆分 Kafka 消息
Splitting Kafka Message Line by line in Spark Structured Streaming
我想在我的 Spark Structured Streaming 作业中将来自 Kafka 主题的消息读入数据框。但是我在一个偏移量中获取整个消息,因此在数据框中只有这条消息进入一行而不是多行。 (在我的例子中是 3 行)
当我打印这条消息时,我得到以下输出:
消息 "Text1"、"Text2" 和 "Text3" 我希望数据框中有 3 行,以便我可以进一步处理。
请帮帮我
您可以使用 用户定义函数 (UDF) 将消息字符串转换为字符串序列,然后应用 explode 该列上的函数,为序列中的每个元素创建一个新行:
如下图(在scala中,同样的原理适用于pyspark):
case class KafkaMessage(offset: Long, message: String)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
val splitString = udf { s: String => s.split('\n') }
df.withColumn("splitMsg", explode(splitString($"message")))
.select("offset", "splitMsg")
.show()
这将产生以下输出:
+------+--------+
|offset|splitMsg|
+------+--------+
| 1000| Text1|
| 1000| Text2|
| 1000| Text3|
+------+--------+
我想在我的 Spark Structured Streaming 作业中将来自 Kafka 主题的消息读入数据框。但是我在一个偏移量中获取整个消息,因此在数据框中只有这条消息进入一行而不是多行。 (在我的例子中是 3 行)
当我打印这条消息时,我得到以下输出:
消息 "Text1"、"Text2" 和 "Text3" 我希望数据框中有 3 行,以便我可以进一步处理。
请帮帮我
您可以使用 用户定义函数 (UDF) 将消息字符串转换为字符串序列,然后应用 explode 该列上的函数,为序列中的每个元素创建一个新行:
如下图(在scala中,同样的原理适用于pyspark):
case class KafkaMessage(offset: Long, message: String)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
val splitString = udf { s: String => s.split('\n') }
df.withColumn("splitMsg", explode(splitString($"message")))
.select("offset", "splitMsg")
.show()
这将产生以下输出:
+------+--------+
|offset|splitMsg|
+------+--------+
| 1000| Text1|
| 1000| Text2|
| 1000| Text3|
+------+--------+