从pyspark中的列中提取多个子字符串

Extract multiple substrings from column in pyspark

我有一个只有一列的 pyspark DataFrame,如下所示:

df = spark.createDataFrame(["This is AD185E000834", "U1JG97297 And ODNO926902 etc.","DIHK2975290;HI22K2390279; DSM928HK08", "there is nothing here."], "string").toDF("col1")

我想将 col1 中的代码提取到其他列,例如:

df.col2 = ["AD185E000834", "U1JG97297", "DIHK2975290", None]
df.col3 = [None, "ODNO926902", "HI22K2390279", None]
df.col4 = [None, None, "DSM928HK08", None]

有人知道怎么做吗?非常感谢。

我相信这可以缩短。竭尽全力为您提供我的逻辑。如果您在问题中列出您的逻辑会更容易

#split string into array
df1=df.withColumn('k', split(col('col1'),'\s|\;')).withColumn('j', size('k'))

#compute maximum array length
s=df1.agg(max('j').alias('max')).distinct().collect()[0][0]


df1 =(df1.withColumn('k',expr("filter(k, x -> x rlike('^[A-Z0-9]+$'))"))#Filter only non alphanumeric characters in the array
     
      #Convert resulting array into struct to allow split
      .withColumn(
    "k",
    F.struct(*[
        F.col("k")[i].alias(f"col{i+2}") for i in range(s)
    ])
))

#Split struct column in df1 and join back to df
df.join(df1.select('col1','k.*'),how='left', on='col1').show()

+--------------------+------------+------------+----------+----+
|                col1|        col2|        col3|      col4|col5|
+--------------------+------------+------------+----------+----+
|DIHK2975290;HI22K...| DIHK2975290|HI22K2390279|DSM928HK08|null|
|This is AD185E000834|AD185E000834|        null|      null|null|
|U1JG97297 And ODN...|   U1JG97297|  ODNO926902|      null|null|
|there is nothing ...|        null|        null|      null|null|
+--------------------+------------+------------+----------+----+

正如您在评论中所说,这里我们假设您的“代码”是至少两个字符的字符串,仅由大写字母和数字组成。

也就是说,从 Spark 3.1+ 开始,您可以使用 regexp_extract_allexpr 函数来创建包含所有代码的临时数组列,然后为每个条目动态创建多个列数组的数量。

import pyspark.sql.functions as F

# create an array with all the identified "codes"
new_df = df.withColumn('myarray', F.expr("regexp_extract_all(col1, '([A-Z0-9]{2,})', 1)"))

# find the maximum amount of codes identified in a single string
max_array_length = new_df.withColumn('array_length', F.size('myarray')).agg({'array_length': 'max'}).collect()[0][0]
print('Max array length: {}'.format(max_array_length))

# explode the array in multiple columns
new_df.select('col1', *[new_df.myarray[i].alias('col' + str(i+2)) for i in range(max_array_length)]) \
  .show(truncate=False)



Max array length: 3
+------------------------------------+------------+------------+----------+
|col1                                |col2        |col3        |col4      |
+------------------------------------+------------+------------+----------+
|This is AD185E000834                |AD185E000834|null        |null      |
|U1JG97297 And ODNO926902 etc.       |U1JG97297   |ODNO926902  |null      |
|DIHK2975290;HI22K2390279; DSM928HK08|DIHK2975290 |HI22K2390279|DSM928HK08|
|there is nothing here.              |null        |null        |null      |
+------------------------------------+------------+------------+----------+