如何在 Pyspark 中检查 Spark 分区的特定分区数据
How to check specific partition data from Spark partitions in Pyspark
我在我的配置单元 table 的 pyspark 中创建了两个数据帧:
data1 = spark.sql("""
SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");
每个国家/地区都有数百万个字母数字格式的唯一 ID。
data2 = spark.sql("""
SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");
我想在 ID 列上使用 pyspark 加入这两个数据框。
我们如何重新分区我们的数据,以便它在分区之间均匀分布。
我可以使用下面的方法来修复我的数据吗?
newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")
最好的分区方式是什么,以便更快地进行连接?
据我所知,您提供 ID 列的方法 repartition
是正确的。将以下内容视为使用 spark_partition_id()
获取相应分区 ID 的概念证明:
创建一些虚拟数据
import pandas as pd
import numpy as np
from pyspark.sql.functions import spark_partition_id
def create_dummy_data():
data = np.vstack([np.random.randint(0, 5, size=10),
np.random.random(10)])
df = pd.DataFrame(data.T, columns=["id", "values"])
return spark.createDataFrame(df)
def show_partition_id(df):
"""Helper function to show partition."""
return df.select(*df.columns, spark_partition_id().alias("pid")).show()
df1 = create_dummy_data()
df2 = create_dummy_data()
重新分区前显示分区 ID
show_partition_id(df1)
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885| 0|
|3.0| 0.4613520717857513| 0|
|0.0| 0.797734780966592| 1|
|2.0|0.35594664760134587| 1|
|2.0|0.08223203758144915| 2|
|0.0| 0.3112880092048709| 2|
|4.0| 0.2689639324292137| 3|
|1.0| 0.6466782159542134| 3|
|0.0| 0.8340472796153436| 3|
|4.0| 0.8054752411745659| 3|
+---+-------------------+---+
show_partition_id(df2)
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533| 0|
|3.0| 0.4084717827425539| 0|
|3.0| 0.798146627431009| 1|
|4.0| 0.8039931522181247| 1|
|3.0| 0.732125135531736| 2|
|0.0| 0.536328329270619| 2|
|1.0|0.25952064363007576| 3|
|2.0| 0.1958334111199559| 3|
|0.0| 0.728098753644471| 3|
|0.0| 0.9825387111807906| 3|
+---+-------------------+---+
重新分区后显示分区 ID
show_partition_id(df1.repartition(2, "id"))
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885| 0|
|3.0| 0.4613520717857513| 0|
|4.0| 0.2689639324292137| 0|
|1.0| 0.6466782159542134| 0|
|4.0| 0.8054752411745659| 0|
|0.0| 0.797734780966592| 1|
|2.0|0.35594664760134587| 1|
|2.0|0.08223203758144915| 1|
|0.0| 0.3112880092048709| 1|
|0.0| 0.8340472796153436| 1|
+---+-------------------+---+
show_partition_id(df2.repartition(2, "id"))
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533| 0|
|3.0| 0.4084717827425539| 0|
|3.0| 0.798146627431009| 0|
|4.0| 0.8039931522181247| 0|
|3.0| 0.732125135531736| 0|
|1.0|0.25952064363007576| 0|
|0.0| 0.536328329270619| 1|
|2.0| 0.1958334111199559| 1|
|0.0| 0.728098753644471| 1|
|0.0| 0.9825387111807906| 1|
+---+-------------------+---+
重新分区后,ids 0和2位于同一个分区,其余在另一个分区。
我在我的配置单元 table 的 pyspark 中创建了两个数据帧:
data1 = spark.sql("""
SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");
每个国家/地区都有数百万个字母数字格式的唯一 ID。
data2 = spark.sql("""
SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");
我想在 ID 列上使用 pyspark 加入这两个数据框。
我们如何重新分区我们的数据,以便它在分区之间均匀分布。
我可以使用下面的方法来修复我的数据吗?
newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")
最好的分区方式是什么,以便更快地进行连接?
据我所知,您提供 ID 列的方法 repartition
是正确的。将以下内容视为使用 spark_partition_id()
获取相应分区 ID 的概念证明:
创建一些虚拟数据
import pandas as pd
import numpy as np
from pyspark.sql.functions import spark_partition_id
def create_dummy_data():
data = np.vstack([np.random.randint(0, 5, size=10),
np.random.random(10)])
df = pd.DataFrame(data.T, columns=["id", "values"])
return spark.createDataFrame(df)
def show_partition_id(df):
"""Helper function to show partition."""
return df.select(*df.columns, spark_partition_id().alias("pid")).show()
df1 = create_dummy_data()
df2 = create_dummy_data()
重新分区前显示分区 ID
show_partition_id(df1)
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885| 0|
|3.0| 0.4613520717857513| 0|
|0.0| 0.797734780966592| 1|
|2.0|0.35594664760134587| 1|
|2.0|0.08223203758144915| 2|
|0.0| 0.3112880092048709| 2|
|4.0| 0.2689639324292137| 3|
|1.0| 0.6466782159542134| 3|
|0.0| 0.8340472796153436| 3|
|4.0| 0.8054752411745659| 3|
+---+-------------------+---+
show_partition_id(df2)
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533| 0|
|3.0| 0.4084717827425539| 0|
|3.0| 0.798146627431009| 1|
|4.0| 0.8039931522181247| 1|
|3.0| 0.732125135531736| 2|
|0.0| 0.536328329270619| 2|
|1.0|0.25952064363007576| 3|
|2.0| 0.1958334111199559| 3|
|0.0| 0.728098753644471| 3|
|0.0| 0.9825387111807906| 3|
+---+-------------------+---+
重新分区后显示分区 ID
show_partition_id(df1.repartition(2, "id"))
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885| 0|
|3.0| 0.4613520717857513| 0|
|4.0| 0.2689639324292137| 0|
|1.0| 0.6466782159542134| 0|
|4.0| 0.8054752411745659| 0|
|0.0| 0.797734780966592| 1|
|2.0|0.35594664760134587| 1|
|2.0|0.08223203758144915| 1|
|0.0| 0.3112880092048709| 1|
|0.0| 0.8340472796153436| 1|
+---+-------------------+---+
show_partition_id(df2.repartition(2, "id"))
+---+-------------------+---+
| id| values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533| 0|
|3.0| 0.4084717827425539| 0|
|3.0| 0.798146627431009| 0|
|4.0| 0.8039931522181247| 0|
|3.0| 0.732125135531736| 0|
|1.0|0.25952064363007576| 0|
|0.0| 0.536328329270619| 1|
|2.0| 0.1958334111199559| 1|
|0.0| 0.728098753644471| 1|
|0.0| 0.9825387111807906| 1|
+---+-------------------+---+
重新分区后,ids 0和2位于同一个分区,其余在另一个分区。