如何从 PySpark DataFrame 中随机取一行?
How take a random row from a PySpark DataFrame?
如何从 PySpark DataFrame 中获取随机行?我只看到 sample()
方法以分数作为参数。将此分数设置为 1/numberOfRows
会导致随机结果,有时我不会得到任何行。
在 RDD
上有一个方法 takeSample()
将您希望样本包含的元素数作为参数。我知道这可能会很慢,因为你必须计算每个分区,但是有没有办法在 DataFrame 上得到这样的东西?
您只需在 RDD
上调用 takeSample
:
df = sqlContext.createDataFrame(
[(1, "a"), (2, "b"), (3, "c"), (4, "d")], ("k", "v"))
df.rdd.takeSample(False, 1, seed=0)
## [Row(k=3, v='c')]
如果你不想收集你可以简单地取一个更高的分数和限制:
df.sample(False, 0.1, seed=0).limit(1)
不要传递一个seed
,你应该每次都得到一个不同的DataFrame。
不同类型的样本
随机抽样 % 的数据有无放回
import pyspark.sql.functions as F
#Randomly sample 50% of the data without replacement
sample1 = df.sample(False, 0.5, seed=0)
#Randomly sample 50% of the data with replacement
sample1 = df.sample(True, 0.5, seed=0)
#Take another sample exlcuding records from previous sample using Anti Join
sample2 = df.join(sample1, on='ID', how='left_anti').sample(False, 0.5, seed=0)
#Take another sample exlcuding records from previous sample using Where
sample1_ids = [row['ID'] for row in sample1.ID]
sample2 = df.where(~F.col('ID').isin(sample1_ids)).sample(False, 0.5, seed=0)
#Generate a startfied sample of the data across column(s)
#Sampling is probabilistic and thus cannot guarantee an exact number of rows
fractions = {
'NJ': 0.5, #Take about 50% of records where state = NJ
'NY': 0.25, #Take about 25% of records where state = NY
'VA': 0.1, #Take about 10% of records where state = VA
}
stratified_sample = df.sampleBy(F.col('state'), fractions, seed=0)
这是使用 Pandas DataFrame.Sample 方法的替代方法。这使用 spark applyInPandas
方法来分发组,可从 Spark 3.0.0 获得。这允许您 select 每组的确切行数。
我已将 args
和 kwargs
添加到函数中,以便您可以访问 DataFrame.Sample
的其他参数。
def sample_n_per_group(n, *args, **kwargs):
def sample_per_group(pdf):
return pdf.sample(n, *args, **kwargs)
return sample_per_group
df = spark.createDataFrame(
[
(1, 1.0),
(1, 2.0),
(2, 3.0),
(2, 5.0),
(2, 10.0)
],
("id", "v")
)
(df.groupBy("id")
.applyInPandas(
sample_n_per_group(1, random_state=2),
schema=df.schema
)
)
要了解非常大的组的限制,来自 documentation:
This function requires a full shuffle. All the data of a group will be
loaded into memory, so the user should be aware of the potential OOM
risk if data is skewed and certain groups are too large to fit in
memory.
如何从 PySpark DataFrame 中获取随机行?我只看到 sample()
方法以分数作为参数。将此分数设置为 1/numberOfRows
会导致随机结果,有时我不会得到任何行。
在 RDD
上有一个方法 takeSample()
将您希望样本包含的元素数作为参数。我知道这可能会很慢,因为你必须计算每个分区,但是有没有办法在 DataFrame 上得到这样的东西?
您只需在 RDD
上调用 takeSample
:
df = sqlContext.createDataFrame(
[(1, "a"), (2, "b"), (3, "c"), (4, "d")], ("k", "v"))
df.rdd.takeSample(False, 1, seed=0)
## [Row(k=3, v='c')]
如果你不想收集你可以简单地取一个更高的分数和限制:
df.sample(False, 0.1, seed=0).limit(1)
不要传递一个seed
,你应该每次都得到一个不同的DataFrame。
不同类型的样本
随机抽样 % 的数据有无放回
import pyspark.sql.functions as F
#Randomly sample 50% of the data without replacement
sample1 = df.sample(False, 0.5, seed=0)
#Randomly sample 50% of the data with replacement
sample1 = df.sample(True, 0.5, seed=0)
#Take another sample exlcuding records from previous sample using Anti Join
sample2 = df.join(sample1, on='ID', how='left_anti').sample(False, 0.5, seed=0)
#Take another sample exlcuding records from previous sample using Where
sample1_ids = [row['ID'] for row in sample1.ID]
sample2 = df.where(~F.col('ID').isin(sample1_ids)).sample(False, 0.5, seed=0)
#Generate a startfied sample of the data across column(s)
#Sampling is probabilistic and thus cannot guarantee an exact number of rows
fractions = {
'NJ': 0.5, #Take about 50% of records where state = NJ
'NY': 0.25, #Take about 25% of records where state = NY
'VA': 0.1, #Take about 10% of records where state = VA
}
stratified_sample = df.sampleBy(F.col('state'), fractions, seed=0)
这是使用 Pandas DataFrame.Sample 方法的替代方法。这使用 spark applyInPandas
方法来分发组,可从 Spark 3.0.0 获得。这允许您 select 每组的确切行数。
我已将 args
和 kwargs
添加到函数中,以便您可以访问 DataFrame.Sample
的其他参数。
def sample_n_per_group(n, *args, **kwargs):
def sample_per_group(pdf):
return pdf.sample(n, *args, **kwargs)
return sample_per_group
df = spark.createDataFrame(
[
(1, 1.0),
(1, 2.0),
(2, 3.0),
(2, 5.0),
(2, 10.0)
],
("id", "v")
)
(df.groupBy("id")
.applyInPandas(
sample_n_per_group(1, random_state=2),
schema=df.schema
)
)
要了解非常大的组的限制,来自 documentation:
This function requires a full shuffle. All the data of a group will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.