从 Pyspark 中的数据框列创建固定长度的多行
Create multiple rows of fixed length from a data frame column in Pyspark
我的输入是 pyspark 中的数据框列,它只有一列 DETAIL_REC。
detail_df.show()
DETAIL_REC
================================
ABC12345678ABC98765543ABC98762345
detail_df.printSchema()
root
|-- DETAIL_REC: string(nullable =true)
对于每 11 个 char/string 它必须在数据帧的下一行中以便下游进程使用它。
预期输出应该是数据框中的多行
DETAIL_REC (No spaces lines after each record)
==============
ABC12345678
ABC98765543
ABC98762345
如果你有 spark 2.4+ 版本,我们可以使用高阶函数来实现,如下所示:
from pyspark.sql import functions as F
n = 11
output = df.withColumn("SubstrCol",F.explode((F.expr(f"""filter(
transform(
sequence(0,length(DETAIL_REC),{n})
,x-> substring(DETAIL_REC,x+1,{n}))
,y->y <> '')"""))))
output.show(truncate=False)
+---------------------------------+-----------+
|DETAIL_REC |SubstrCol |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+
使用的逻辑:
- 首先生成一个从0开始到字符串长度为11(n)的整数序列
- 使用变换遍历此序列并不断从原始字符串中获取子字符串(这会不断改变起始位置。
- 从生成的数组中过滤掉所有空白字符串并展开此数组。
对于较低版本的 spark,请将 udf 与 textwrap 或任何其他功能一起使用 here:
from pyspark.sql import functions as F, types as T
from textwrap import wrap
n = 11
myudf = F.udf(lambda x: wrap(x,n),T.ArrayType(T.StringType()))
output = df.withColumn("SubstrCol",F.explode(myudf("DETAIL_REC")))
output.show(truncate=False)
+---------------------------------+-----------+
|DETAIL_REC |SubstrCol |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+
我的输入是 pyspark 中的数据框列,它只有一列 DETAIL_REC。
detail_df.show()
DETAIL_REC
================================
ABC12345678ABC98765543ABC98762345
detail_df.printSchema()
root
|-- DETAIL_REC: string(nullable =true)
对于每 11 个 char/string 它必须在数据帧的下一行中以便下游进程使用它。
预期输出应该是数据框中的多行
DETAIL_REC (No spaces lines after each record)
==============
ABC12345678
ABC98765543
ABC98762345
如果你有 spark 2.4+ 版本,我们可以使用高阶函数来实现,如下所示:
from pyspark.sql import functions as F
n = 11
output = df.withColumn("SubstrCol",F.explode((F.expr(f"""filter(
transform(
sequence(0,length(DETAIL_REC),{n})
,x-> substring(DETAIL_REC,x+1,{n}))
,y->y <> '')"""))))
output.show(truncate=False)
+---------------------------------+-----------+
|DETAIL_REC |SubstrCol |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+
使用的逻辑:
- 首先生成一个从0开始到字符串长度为11(n)的整数序列
- 使用变换遍历此序列并不断从原始字符串中获取子字符串(这会不断改变起始位置。
- 从生成的数组中过滤掉所有空白字符串并展开此数组。
对于较低版本的 spark,请将 udf 与 textwrap 或任何其他功能一起使用 here:
from pyspark.sql import functions as F, types as T
from textwrap import wrap
n = 11
myudf = F.udf(lambda x: wrap(x,n),T.ArrayType(T.StringType()))
output = df.withColumn("SubstrCol",F.explode(myudf("DETAIL_REC")))
output.show(truncate=False)
+---------------------------------+-----------+
|DETAIL_REC |SubstrCol |
+---------------------------------+-----------+
|ABC12345678ABC98765543ABC98762345|ABC12345678|
|ABC12345678ABC98765543ABC98762345|ABC98765543|
|ABC12345678ABC98765543ABC98762345|ABC98762345|
+---------------------------------+-----------+