从 Pyspark 中的数据框列创建固定长度的多行

Create multiple rows of fixed length from a data frame column in Pyspark

我的输入是 pyspark 中的数据框列,它只有一列 DETAIL_REC。

detail_df.show()

DETAIL_REC
================================
ABC12345678ABC98765543ABC98762345

detail_df.printSchema()
root
|-- DETAIL_REC: string(nullable =true)

对于每 11 个 char/string 它必须在数据帧的下一行中以便下游进程使用它。

预期输出应该是数据框中的多行

DETAIL_REC (No spaces lines after each record)
==============
ABC12345678
ABC98765543 
ABC98762345 

如果你有 spark 2.4+ 版本,我们可以使用高阶函数来实现,如下所示:

from pyspark.sql import functions as F
n = 11
output = df.withColumn("SubstrCol",F.explode((F.expr(f"""filter(
                                      transform(
                                      sequence(0,length(DETAIL_REC),{n})
                                      ,x-> substring(DETAIL_REC,x+1,{n}))
                                      ,y->y <> '')"""))))

output.show(truncate=False)

+---------------------------------+-----------+
|DETAIL_REC                       |SubstrCol  |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+

使用的逻辑:

  1. 首先生成一个从0开始到字符串长度为11(n)的整数序列
  2. 使用变换遍历此序列并不断从原始字符串中获取子字符串(这会不断改变起始位置。
  3. 从生成的数组中过滤掉所有空白字符串并展开此数组。

对于较低版本的 spark,请将 udf 与 textwrap 或任何其他功能一起使用 here:

from pyspark.sql import functions as F, types as T
from textwrap import wrap
n = 11
myudf = F.udf(lambda x: wrap(x,n),T.ArrayType(T.StringType()))

output = df.withColumn("SubstrCol",F.explode(myudf("DETAIL_REC")))

output.show(truncate=False)

+---------------------------------+-----------+
|DETAIL_REC                       |SubstrCol  |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+