增量添加到 Hive table w/Scala + Spark 1.3
Incrementally adding to a Hive table w/Scala + Spark 1.3
我们的集群有 Spark 1.3 和 Hive
我需要向其中添加随机选择的行的大型 Hive table。
我读取并检查了一个较小的 table 条件,如果该条件为真,那么我会获取我需要的变量,然后查询要填充的随机行。我所做的是对该条件进行查询,table.where(value<number)
,然后使用 take(num rows)
将其制成一个数组。然后,由于所有这些行都包含我需要的信息,即需要从大型配置单元 table 中随机获取哪些行,因此我遍历数组。
当我执行查询时,我在查询中使用 ORDER BY RAND()
(使用 sqlContext
)。我创建了一个 var Hive table
(将成为 mutable),从较大的 table 添加一列。在循环中,我做了一个 unionAll newHiveTable = newHiveTable.unionAll(random_rows)
我已经尝试了很多不同的方法来做到这一点,但我不确定避免 CPU 和临时磁盘使用的最佳方法是什么。我知道 Dataframes 不适用于增量添加。
我现在要尝试的一件事是创建一个 cvs 文件,在循环中将随机行递增地写入该文件,然后在循环完成后,将 cvs 文件加载为 table,然后执行一个 unionAll得到我的最终 table.
任何反馈都会很棒。谢谢
我建议您使用 hive 创建一个外部 table,定义位置,然后让 spark 将输出作为 csv 写入该目录:
在 Hive 中:
create external table test(key string, value string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'
LOCATION '/SOME/HDFS/LOCATION'
然后在 https://github.com/databricks/spark-csv 的帮助下从 spark 将数据帧写入 csv 文件并附加到现有文件:
df.write.format("com.databricks.spark.csv").save("/SOME/HDFS/LOCATION/", SaveMode.Append)
我们的集群有 Spark 1.3 和 Hive
我需要向其中添加随机选择的行的大型 Hive table。
我读取并检查了一个较小的 table 条件,如果该条件为真,那么我会获取我需要的变量,然后查询要填充的随机行。我所做的是对该条件进行查询,table.where(value<number)
,然后使用 take(num rows)
将其制成一个数组。然后,由于所有这些行都包含我需要的信息,即需要从大型配置单元 table 中随机获取哪些行,因此我遍历数组。
当我执行查询时,我在查询中使用 ORDER BY RAND()
(使用 sqlContext
)。我创建了一个 var Hive table
(将成为 mutable),从较大的 table 添加一列。在循环中,我做了一个 unionAll newHiveTable = newHiveTable.unionAll(random_rows)
我已经尝试了很多不同的方法来做到这一点,但我不确定避免 CPU 和临时磁盘使用的最佳方法是什么。我知道 Dataframes 不适用于增量添加。 我现在要尝试的一件事是创建一个 cvs 文件,在循环中将随机行递增地写入该文件,然后在循环完成后,将 cvs 文件加载为 table,然后执行一个 unionAll得到我的最终 table.
任何反馈都会很棒。谢谢
我建议您使用 hive 创建一个外部 table,定义位置,然后让 spark 将输出作为 csv 写入该目录:
在 Hive 中:
create external table test(key string, value string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'
LOCATION '/SOME/HDFS/LOCATION'
然后在 https://github.com/databricks/spark-csv 的帮助下从 spark 将数据帧写入 csv 文件并附加到现有文件:
df.write.format("com.databricks.spark.csv").save("/SOME/HDFS/LOCATION/", SaveMode.Append)