如何在 Pyspark 中有效地加入一个非常大的 table 和一个大的 table
How to efficiently join a very large table and a large table in Pyspark
我有两个 table。 table 都是以镶木地板数据格式存储的配置单元中的外部 table。
第一个 table table_1 从 2015 年开始每天有 2.5 亿 行。这个 table是根据create_date划分的。所以对于每个 create_date,大约有 250M 行。
第二个 table - table_2 是每日增量 table,平均行数约为 150 万 行。
两个 table 中有一个公共列 "lookup_id"。现在我需要使用数据帧从 table_1 中获取所有列以获取来自 table_2 的增量数据。
我想做如下的事情
table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)
但我怀疑这是否真的有效以及 pyspark 是否能够在没有任何内存错误的情况下处理它。
问题 1:
如何基于 create_date 个分区并行化 table_1 扫描?
问题二:
是否有任何其他方法可以基于 table_2 and/or 基于分区的 lookup_id 优化 table_1 扫描?
更多信息,让我更清楚地了解我正在寻找的内容:
我试图了解当我们使用数据帧加入 tables 时,spark 是否读取数据并将其保存在内存中并加入它们,或者它只是在读取自身时加入。如果第二个为真,则第二个语句适用的所有连接是什么。另外如果有任何需要使用循环来避免任何内存错误。
不确定您的驱动程序和执行程序内存,但通常有两种可能的连接优化 - 将小 table 广播到所有执行程序并为两个数据帧使用相同的分区键。在您的情况下,如果 table 2 太大而无法广播,则根据您的查找 ID 重新分区将使它更快。但修复有其自身的成本。您可以在此处找到更多信息 - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=One%20way%20to%20avoid%20shuffles,then%20broadcast%20to%20every%20executor.
告诉我你的想法。期待这个话题的讨论。
如果你不能广播,一个避免使用分桶加入的例子 - 灵感来自这里:
spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.
#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')
现在试试
inputDf3.explain()
结果将是这样的:
== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
:- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
: +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
: +- *(1) Filter isnotnull(item#1033)
: +- *(1) FileScan parquet
+- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
+- *(2) Project [id#1038, item#1039]
+- *(2) Filter isnotnull(item#1039)
+- *(2) FileScan parquet
如您所见,此处没有进行 Exchange 散列分区。因此,请尝试对您的两个数据框进行分桶并尝试加入。
当您阅读 CSV 时..它会自动分区并进行并行处理..基于默认配置(以防我们不做任何更改)
对此的具体回答...如果您在 HDFS 上存储了一个 30GB 的未压缩文本文件,那么使用默认的 HDFS 块大小设置 (128MB) 它将存储在 235 个块中,这意味着您的 RDD从这个文件中读取将有 235 个分区。
现在,这里有两件事 1。 CSV 等平面文件和 2. parquet
等压缩文件
当你有一个文本文件时...当 Spark 从 HDFS 读取文件时,它会为单个输入拆分创建一个分区。输入拆分由用于读取此文件的 Hadoop InputFormat 设置。例如,如果您使用 textFile(),它将是 Hadoop 中的 TextInputFormat,它将 return 为单个 HDFS 块创建一个分区(但分区之间的拆分将在行拆分上完成,而不是确切的块split),除非你有一个压缩的文本文件。
对于镶木地板或压缩文件:在压缩文件的情况下,您将获得单个文件的单个分区(因为压缩文本文件不可拆分)。
现在,由于您使用的是 parquet,这已经很好地分区了,在进行优化时,您可以检查集群大小并查看发生了多少分区等。
所以,回答:问题 1:如何基于 create_date 分区并行化 table_1 扫描? 这已经分区了
对于,问题2:有没有其他方法可以优化table_1基于分区的lookup_ids扫描来自table_2and/or ?
您可以尝试过滤不需要的记录,这个概念在 Spark SQL 查询中称为 Spark 谓词下推,因此即使在将数据加载到内存之前,spark 也会过滤掉不必要的列.. 更多here
Spark predicate push down to database allows for better optimized
Spark queries. A predicate is a condition on a query that returns true
or false, typically located in the WHERE clause. A predicate push down
filters the data in the database query, reducing the number of entries
retrieved from the database and improving query performance. By
default the Spark Dataset API will automatically push down valid WHERE
clauses to the database.
我有两个 table。 table 都是以镶木地板数据格式存储的配置单元中的外部 table。
第一个 table table_1 从 2015 年开始每天有 2.5 亿 行。这个 table是根据create_date划分的。所以对于每个 create_date,大约有 250M 行。
第二个 table - table_2 是每日增量 table,平均行数约为 150 万 行。
两个 table 中有一个公共列 "lookup_id"。现在我需要使用数据帧从 table_1 中获取所有列以获取来自 table_2 的增量数据。
我想做如下的事情
table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)
但我怀疑这是否真的有效以及 pyspark 是否能够在没有任何内存错误的情况下处理它。
问题 1: 如何基于 create_date 个分区并行化 table_1 扫描?
问题二: 是否有任何其他方法可以基于 table_2 and/or 基于分区的 lookup_id 优化 table_1 扫描?
更多信息,让我更清楚地了解我正在寻找的内容:
我试图了解当我们使用数据帧加入 tables 时,spark 是否读取数据并将其保存在内存中并加入它们,或者它只是在读取自身时加入。如果第二个为真,则第二个语句适用的所有连接是什么。另外如果有任何需要使用循环来避免任何内存错误。
不确定您的驱动程序和执行程序内存,但通常有两种可能的连接优化 - 将小 table 广播到所有执行程序并为两个数据帧使用相同的分区键。在您的情况下,如果 table 2 太大而无法广播,则根据您的查找 ID 重新分区将使它更快。但修复有其自身的成本。您可以在此处找到更多信息 - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=One%20way%20to%20avoid%20shuffles,then%20broadcast%20to%20every%20executor.
告诉我你的想法。期待这个话题的讨论。
如果你不能广播,一个避免使用分桶加入的例子 - 灵感来自这里:
spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.
#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')
现在试试
inputDf3.explain()
结果将是这样的:
== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
:- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
: +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
: +- *(1) Filter isnotnull(item#1033)
: +- *(1) FileScan parquet
+- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
+- *(2) Project [id#1038, item#1039]
+- *(2) Filter isnotnull(item#1039)
+- *(2) FileScan parquet
如您所见,此处没有进行 Exchange 散列分区。因此,请尝试对您的两个数据框进行分桶并尝试加入。
当您阅读 CSV 时..它会自动分区并进行并行处理..基于默认配置(以防我们不做任何更改)
对此的具体回答...如果您在 HDFS 上存储了一个 30GB 的未压缩文本文件,那么使用默认的 HDFS 块大小设置 (128MB) 它将存储在 235 个块中,这意味着您的 RDD从这个文件中读取将有 235 个分区。
现在,这里有两件事 1。 CSV 等平面文件和 2. parquet
等压缩文件当你有一个文本文件时...当 Spark 从 HDFS 读取文件时,它会为单个输入拆分创建一个分区。输入拆分由用于读取此文件的 Hadoop InputFormat 设置。例如,如果您使用 textFile(),它将是 Hadoop 中的 TextInputFormat,它将 return 为单个 HDFS 块创建一个分区(但分区之间的拆分将在行拆分上完成,而不是确切的块split),除非你有一个压缩的文本文件。
对于镶木地板或压缩文件:在压缩文件的情况下,您将获得单个文件的单个分区(因为压缩文本文件不可拆分)。
现在,由于您使用的是 parquet,这已经很好地分区了,在进行优化时,您可以检查集群大小并查看发生了多少分区等。
所以,回答:问题 1:如何基于 create_date 分区并行化 table_1 扫描? 这已经分区了
对于,问题2:有没有其他方法可以优化table_1基于分区的lookup_ids扫描来自table_2and/or ?
您可以尝试过滤不需要的记录,这个概念在 Spark SQL 查询中称为 Spark 谓词下推,因此即使在将数据加载到内存之前,spark 也会过滤掉不必要的列.. 更多here
Spark predicate push down to database allows for better optimized Spark queries. A predicate is a condition on a query that returns true or false, typically located in the WHERE clause. A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.