在 Spark Dataframe 行上并行操作
Operating in parallel on a Spark Dataframe Rows
环境:Scala、spark、结构化流、kafka
我有一个来自具有以下架构的 kafka 流的 DF
DF:
BATCH ID: 0
+-----------------------+-----+---------+------+
| value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}| A | 0| 0|
|{"big and nested json"}| B | 0| 0|
+-----------------------+-----+---------+------+
我想使用 spark 并行处理每一行,我设法使用
将它们拆分给我的执行程序
DF.repartition(Number).foreach(row=> processRow(row))
我需要从值列中提取值到它自己的数据框中来处理它。
我在使用 Dataframe 通用 Row 对象时遇到困难..
有没有办法将每个执行程序中的单行转换为它自己的 Dataframe(使用固定模式?)并写入固定位置?
有没有更好的方法来解决我的问题?
编辑 + 澄清:
使用 spark2.4
以来存在的 writeStream 功能的 forEachBatch
功能,DF 即时消息接收作为批处理出现
目前将 DF 拆分为 ROWS 使得行将平均拆分到我的所有执行程序中,我想将单个 GenericRow 对象转换为 DataFrame 以便我可以使用我制作的函数进行处理
例如,我会将行发送到函数
processRow(row:row)
取值和主题,转回单行DF
+-----------------------+-----+
| value|topic|
+-----------------------+-----+
|{"big and nested json"}| A |
+-----------------------+-----+
待进一步处理
在这种情况下,更适合使用 .map
而不是 .foreach
。原因是 map
return 是一个新数据集,而 foreach
只是一个函数,return 什么都没有。
另一件可以帮助您的事情是解析位于 JSON.
中的模式
我最近也有类似的需求。
我的 JSON 对象对主题 A
和 B
都有一个“相似”的架构。如果您不是这种情况,您可能需要通过按主题分组在下面的解决方案中创建多个 dataframes
。
val sanitiseJson: String => String = value => value
.replace("\\"", "\"")
.replace("\\", "\")
.replace("\n", "")
.replace("\"{", "{")
.replace("}\"", "}")
val parsed = df.toJSON
.map(sanitiseJson)
这会给你这样的东西:
{
"value": { ... },
"topic": "A"
}
然后你可以将它传递给一个新的 read
函数:
var dfWithSchema = spark.read.json(parsed)
此时您将访问嵌套 JSON:
中的值
dfWithSchema.select($"value.propertyInJson")
如果需要,您可以对 sanitiseJson
进行一些优化。
我猜你一次消费了多个kafka数据。
首先你需要为所有kafka主题准备schema
,例如我在值列中使用了两个不同的JSON。
scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A |
|{"age":20} |B |
+-------------------+-----+
scala> import org.apache.spark.sql.types._
主题 A 的架构
scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
主题 B 的架构
scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
组合主题及其架构。
scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.
正在处理数据帧
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value |topic|
+----------+-----+
|[Srinivas]|A |
+----------+-----+
+-----+-----+
|value|topic|
+-----+-----+
|[20] |B |
+-----+-----+
写入 hdfs
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))
最终数据存储在hdfs
scala> import sys.process._
import sys.process._
scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│ ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│ └── _SUCCESS
└── B
├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
└── _SUCCESS
2 directories, 4 files
环境:Scala、spark、结构化流、kafka
我有一个来自具有以下架构的 kafka 流的 DF
DF:
BATCH ID: 0
+-----------------------+-----+---------+------+
| value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}| A | 0| 0|
|{"big and nested json"}| B | 0| 0|
+-----------------------+-----+---------+------+
我想使用 spark 并行处理每一行,我设法使用
将它们拆分给我的执行程序DF.repartition(Number).foreach(row=> processRow(row))
我需要从值列中提取值到它自己的数据框中来处理它。 我在使用 Dataframe 通用 Row 对象时遇到困难..
有没有办法将每个执行程序中的单行转换为它自己的 Dataframe(使用固定模式?)并写入固定位置? 有没有更好的方法来解决我的问题?
编辑 + 澄清:
使用 spark2.4
forEachBatch
功能,DF 即时消息接收作为批处理出现
目前将 DF 拆分为 ROWS 使得行将平均拆分到我的所有执行程序中,我想将单个 GenericRow 对象转换为 DataFrame 以便我可以使用我制作的函数进行处理
例如,我会将行发送到函数
processRow(row:row)
取值和主题,转回单行DF
+-----------------------+-----+
| value|topic|
+-----------------------+-----+
|{"big and nested json"}| A |
+-----------------------+-----+
待进一步处理
在这种情况下,更适合使用 .map
而不是 .foreach
。原因是 map
return 是一个新数据集,而 foreach
只是一个函数,return 什么都没有。
另一件可以帮助您的事情是解析位于 JSON.
中的模式我最近也有类似的需求。
我的 JSON 对象对主题 A
和 B
都有一个“相似”的架构。如果您不是这种情况,您可能需要通过按主题分组在下面的解决方案中创建多个 dataframes
。
val sanitiseJson: String => String = value => value
.replace("\\"", "\"")
.replace("\\", "\")
.replace("\n", "")
.replace("\"{", "{")
.replace("}\"", "}")
val parsed = df.toJSON
.map(sanitiseJson)
这会给你这样的东西:
{
"value": { ... },
"topic": "A"
}
然后你可以将它传递给一个新的 read
函数:
var dfWithSchema = spark.read.json(parsed)
此时您将访问嵌套 JSON:
中的值dfWithSchema.select($"value.propertyInJson")
如果需要,您可以对 sanitiseJson
进行一些优化。
我猜你一次消费了多个kafka数据。
首先你需要为所有kafka主题准备schema
,例如我在值列中使用了两个不同的JSON。
scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A |
|{"age":20} |B |
+-------------------+-----+
scala> import org.apache.spark.sql.types._
主题 A 的架构
scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
主题 B 的架构
scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
组合主题及其架构。
scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.
正在处理数据帧
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value |topic|
+----------+-----+
|[Srinivas]|A |
+----------+-----+
+-----+-----+
|value|topic|
+-----+-----+
|[20] |B |
+-----+-----+
写入 hdfs
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))
最终数据存储在hdfs
scala> import sys.process._
import sys.process._
scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│ ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│ └── _SUCCESS
└── B
├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
└── _SUCCESS
2 directories, 4 files