Pyspark 根据账户创建批号列

Pyspark create batch number column based on account

假设我有一个包含多个唯一帐户值的 pyspark 数据框,每个帐户值都有一个唯一的条目数,如下所示:

+-------------_+--------+--------+
| account| col1|  col2  | col3   |
+--------+-----+--------+--------+
| 325235 |   59|      -6|  625.64|
| 325235 |   23|    -282|  923.47|
| 325235 |   77|-1310.89| 3603.48|
| 245623 |  120|    1.53| 1985.63|
| 245623 |  106|     -12| 1985.06|
| 658567 |   84|     -12|  194.67|

我想指定一个批次大小,并根据批次大小将多个帐户分配给同一个批次。假设我选择批量大小 = 2,则输出应如下所示:

+-------------_+--------+--------+--------------+
| account| col1|  col2  | col3   | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 |   59|      -6|  625.64|             1|
| 325235 |   23|    -282|  923.47|             1|
| 325235 |   77|-1310.89| 3603.48|             1|
| 245623 |  120|    1.53| 1985.63|             1|
| 245623 |  106|     -12| 1985.06|             1|
| 658567 |   84|     -12|  194.67|             2|

然后我可以在 batch_number 列上进行分组,并且每批有多个帐户。这是我的工作代码,但它太慢了,因为我正在做一个 toPandas()。

# Get unique accounts in source data
accounts = [row.account for row in source_data.select("account").distinct().collect()]
    
# Find number of batches based. Last batch will have size = remainder
num_batches, remainder = divmod(len(accounts), batchsize)
    
# Create batch dataframe where a batch number is assigned to each account.
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
    
# Add a zero column for batch number to source data which will be populated
source_data = source_data.withColumn("batch_number", lit(0))
    
# Map batch numbers of accounts back into the source data
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
    source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
        
# Convert mapped pandas df back to spark df
batched_df = sqlcontext.createDataFrame(source_data_p)

理想情况下,我希望摆脱 toPandas() 调用,并在 pyspark 中进行映射。我看过一些相关的帖子,比如这个:,但这不符合我的代码流程,所以我将不得不重新编写整个项目来实现它。

据我了解,您可以使用 mllib 或任何其他方式使用索引器,然后进行楼层划分:

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer

n=2

idx = StringIndexer(inputCol="account",outputCol="batch_number")

(idx.fit(df).transform(df)
    .withColumn("batch_number",F.floor(F.col("batch_number")/n)+1)).show()

+-------+----+--------+-------+------------+
|account|col1|    col2|   col3|batch_number|
+-------+----+--------+-------+------------+
| 325235|  59|    -6.0| 625.64|           1|
| 325235|  23|  -282.0| 923.47|           1|
| 325235|  77|-1310.89|3603.48|           1|
| 245623| 120|    1.53|1985.63|           1|
| 245623| 106|   -12.0|1985.06|           1|
| 658567|  84|   -12.0| 194.67|           2|
+-------+----+--------+-------+------------+