将循环的输出写入数据帧
Write an output of a loop into a dataframe
我编写了这个 Scala 代码来为 Spark DataFrame 中的每一行做一些事情。基本上这些是我做的步骤
1. I convert the DataFrame into an array
2. Iterate through the array and perform calculations and get the output in an array
3. convert the output of the array to a dataframe and then make a Hive table.
在第 2 步中,当我 运行 一百万条记录时,我遇到了问题。无论如何我可以提高性能。仅供参考,我只将数据帧转换为数组,因为无法迭代 AFAIK spark 数据帧。
def getRows (ca : org.apache.spark.sql.DataFrame ) =
{
val allca = List()
for (a <- ca.collect()) yield
{
val newAddress = a.getString(1)
val output = newAddress :: getRecursiveList(newAddress).reverse
val dataset =
CA (account.getInt(0),
account.getString(1),
account.getString(2),
output.toString)
dataset :: allca
}
}
val myArray = getRows(customerAccounts)
val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF
OutputDataFrame.show()
val resultsRDD = OutputDataFrame.registerTempTable("history")
spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println)
请了解一些基础知识:
- Spark scala/Java API 提供了一个非常高层次的视角,并没有提供任何关于数据结构分布式性质的想法。
- 迭代数据帧有两种选择:你是以分布式方式迭代它们还是在一台机器上收集所有数据然后迭代。
ca.collect() 正在从所有节点的数据帧中收集数据,并将数据发送给驱动程序进行处理,这不是可扩展的解决方案。
请点击以下链接以更好地理解
我编写了这个 Scala 代码来为 Spark DataFrame 中的每一行做一些事情。基本上这些是我做的步骤
1. I convert the DataFrame into an array
2. Iterate through the array and perform calculations and get the output in an array
3. convert the output of the array to a dataframe and then make a Hive table.
在第 2 步中,当我 运行 一百万条记录时,我遇到了问题。无论如何我可以提高性能。仅供参考,我只将数据帧转换为数组,因为无法迭代 AFAIK spark 数据帧。
def getRows (ca : org.apache.spark.sql.DataFrame ) =
{
val allca = List()
for (a <- ca.collect()) yield
{
val newAddress = a.getString(1)
val output = newAddress :: getRecursiveList(newAddress).reverse
val dataset =
CA (account.getInt(0),
account.getString(1),
account.getString(2),
output.toString)
dataset :: allca
}
}
val myArray = getRows(customerAccounts)
val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF
OutputDataFrame.show()
val resultsRDD = OutputDataFrame.registerTempTable("history")
spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println)
请了解一些基础知识:
- Spark scala/Java API 提供了一个非常高层次的视角,并没有提供任何关于数据结构分布式性质的想法。
- 迭代数据帧有两种选择:你是以分布式方式迭代它们还是在一台机器上收集所有数据然后迭代。
ca.collect() 正在从所有节点的数据帧中收集数据,并将数据发送给驱动程序进行处理,这不是可扩展的解决方案。
请点击以下链接以更好地理解