spark 数据清洗方法

Approach to cleaning Data in spark

我是数据的新手engineering/machine自学自学。在处理示例问题时,我遇到了以下数据清理任务

1. Remove extra whitespaces (keep one whitespace in between word but remove more 
than one whitespaces) and punctuations

2. Turn all the words to lower case and remove stop words (list from NLTK)

3. Remove duplicate words in ASSEMBLY_NAME column

虽然我在大学作业期间致力于编写代码来执行这些任务,但我从来没有在一段代码中完成过(对于任何项目),我在这里寻求专家的指导可以通过指出完成它的最佳方法来帮助我 (in python or scala)

目前完成的工作:

1.从拼花文件中读取数据

partFitmentDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")

display(partFitmentDF)

2。从 DF

创建 table
partFitmentDF.createOrReplaceTempView("partsFits")
partFitmentDF.write.mode("overwrite").format("delta").saveAsTable("partsFitsTable")

3。重新排列 table 中 fits_assembly_name 的数据,以便所有 fits_assembly_name 和 fits_assembly_id 滚动到每个不同项目的单行

%sql

select itemno, concat_ws(' | ' , collect_set(cast(fits_assembly_id as int))) as fits_assembly_id, concat_ws(' | ' ,collect_set(fits_assembly_name)) as fits_assembly_name 
from partsFitsTable 
WHERE itemno = 1014584
group by itemno

P.S.

来自 partFitmentDF 选定列的示例数据

itemno  fits_assembly_id        fits_assembly_name
0450056 44011           OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056 135502          OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056 37884           OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)
0450056 19618           OIL PUMP ASSEMBLY - A06FA09CA/CB/CC (4999202399920239A06)
0450056 135021          OIL PUMP ASSEMBLY - A02EA05CA (4999202399920239A06)
0450056 4147            OIL PUMP ASSEMBLY - A04KA05CA (4999202359920235A06)
0450056 12003           OIL PUMP ASSEMBLY - A05FA09CA/CB/CC (4999202399920239A06)

现在,我需要将这些多行按项目号 滚动到一行(所有属于一个项目号的 assembly_name 和 id 应该在一行中) 然后我需要执行最上面列出的任务#1、2 和 3 以清理 fits_assembly_name 列并将处理后的数据保存到最终数据帧或 table 与 itemno , fits_assembly_id 和 fits_assembly_name 列 但我不确定如何在 python 中开始执行此操作。您能否通过建议方法 (和代码提示) 来帮助我,以便我可以进一步完成这项任务?

您想使用 SQL 或 pyspark 函数。

  • The pyspark docs
  • The Spark SQL docs

  • trim(s) -- returns 字符串 s 前导和尾随空格

  • regexp_replace(s, "\W") -- returns 字符串 s 非字母数字字符 ("punctuation") 被空格替换
  • lower(s) -- returns 字符串 s 所有字符均为小写
  • split(s) -- returns 字符串中的一组单词 s 拆分,其中单词边界是一个或多个空格。
  • array_distinct(a) -- returns a
  • 的不同元素

在 pyspark 中像这样把它们放在一起:

from pyspark.sql import *
import pyspark.sql.functions as F

df = spark.createDataFrame( [Row(name="BODY, DECALS - Z19VF99LK/LE (702498)"), Row(name="     ABC  DEF ABC   ")] ) 
clean_df = df.withColumn("words", 
                         F.array_distinct(
                             F.split(
                               F.trim(
                                 F.regexp_replace(F.lower(df.name), "\W", " ")
                               )
                             , " +")
                         )
                        )
clean_df.show(truncate=False)

要生成 words 列,结果为:

+------------------------------------+-------------------------------------+
|name                                |words                                |
+------------------------------------+-------------------------------------+
|BODY, DECALS - Z19VF99LK/LE (702498)|[body, decals, z19vf99lk, le, 702498]|
|     ABC  DEF ABC                   |[abc, def]                           |
+------------------------------------+-------------------------------------+

我不太清楚确切的问题,但这应该能让你朝着正确的方向前进。

检查以下是否适合您。我假设 df 是 groupby 之后的数据帧,collect_set 你已经 运行:

from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import expr

任务 1:使用 RegexTokenizer

使用模式(?:\p{Punct}|\s)+拆分字符串,将结果保存到temp1列。生成的字符串数组将所有项目都小写,leading/trailing 空格也被删除。

tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol='fits_assembly_name', outputCol='temp1')

df1 = tk.transform(df)

任务 2:使用 StopWordsRemover

删除停用词并将结果保存到 temp2 列:

sw = StopWordsRemover(inputCol='temp1', outputCol='temp2')

df2 = sw.transform(df1)

您可以通过键入 sw.getStopWords() 检查所有当前停用词,检查 loadDefaultStopWords(language) 以切换到另一种语言设置,或通过以下方式附加您自己的停用词:

mylist = sw.getStopWords() + ['my', 'black', 'list']
# then adjust the transformer to the following
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2', stopWords=mylist)

此时,您应该有一个字符串数组列 temp2,其中删除了停用词。

任务 3:

使用array_distinct()删除重复项,concat_ws()将数组转换为字符串,然后删除两个临时列:

df_new = df2.withColumn('fits_assembly_name', expr('concat_ws(" ", array_distinct(temp2))')) \
            .drop('temp1', 'temp2')

如果您对上述代码有任何问题,请告诉我。