spark 数据清洗方法
Approach to cleaning Data in spark
我是数据的新手engineering/machine自学自学。在处理示例问题时,我遇到了以下数据清理任务
1. Remove extra whitespaces (keep one whitespace in between word but remove more
than one whitespaces) and punctuations
2. Turn all the words to lower case and remove stop words (list from NLTK)
3. Remove duplicate words in ASSEMBLY_NAME column
虽然我在大学作业期间致力于编写代码来执行这些任务,但我从来没有在一段代码中完成过(对于任何项目),我在这里寻求专家的指导可以通过指出完成它的最佳方法来帮助我 (in python or scala)
目前完成的工作:
1.从拼花文件中读取数据
partFitmentDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")
display(partFitmentDF)
2。从 DF
创建 table
partFitmentDF.createOrReplaceTempView("partsFits")
partFitmentDF.write.mode("overwrite").format("delta").saveAsTable("partsFitsTable")
3。重新排列 table 中 fits_assembly_name 的数据,以便所有 fits_assembly_name 和 fits_assembly_id 滚动到每个不同项目的单行
%sql
select itemno, concat_ws(' | ' , collect_set(cast(fits_assembly_id as int))) as fits_assembly_id, concat_ws(' | ' ,collect_set(fits_assembly_name)) as fits_assembly_name
from partsFitsTable
WHERE itemno = 1014584
group by itemno
P.S.
来自 partFitmentDF 选定列的示例数据
itemno fits_assembly_id fits_assembly_name
0450056 44011 OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056 135502 OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056 37884 OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)
0450056 19618 OIL PUMP ASSEMBLY - A06FA09CA/CB/CC (4999202399920239A06)
0450056 135021 OIL PUMP ASSEMBLY - A02EA05CA (4999202399920239A06)
0450056 4147 OIL PUMP ASSEMBLY - A04KA05CA (4999202359920235A06)
0450056 12003 OIL PUMP ASSEMBLY - A05FA09CA/CB/CC (4999202399920239A06)
现在,我需要将这些多行按项目号 滚动到一行(所有属于一个项目号的 assembly_name 和 id 应该在一行中) 然后我需要执行最上面列出的任务#1、2 和 3 以清理 fits_assembly_name 列并将处理后的数据保存到最终数据帧或 table 与 itemno , fits_assembly_id 和 fits_assembly_name 列 但我不确定如何在 python 中开始执行此操作。您能否通过建议方法 (和代码提示) 来帮助我,以便我可以进一步完成这项任务?
您想使用 SQL 或 pyspark 函数。
- The pyspark docs
trim(s)
-- returns 字符串 s
前导和尾随空格
regexp_replace(s, "\W")
-- returns 字符串 s
非字母数字字符 ("punctuation") 被空格替换
lower(s)
-- returns 字符串 s
所有字符均为小写
split(s)
-- returns 字符串中的一组单词 s
拆分,其中单词边界是一个或多个空格。
array_distinct(a)
-- returns a 的不同元素
在 pyspark 中像这样把它们放在一起:
from pyspark.sql import *
import pyspark.sql.functions as F
df = spark.createDataFrame( [Row(name="BODY, DECALS - Z19VF99LK/LE (702498)"), Row(name=" ABC DEF ABC ")] )
clean_df = df.withColumn("words",
F.array_distinct(
F.split(
F.trim(
F.regexp_replace(F.lower(df.name), "\W", " ")
)
, " +")
)
)
clean_df.show(truncate=False)
要生成 words
列,结果为:
+------------------------------------+-------------------------------------+
|name |words |
+------------------------------------+-------------------------------------+
|BODY, DECALS - Z19VF99LK/LE (702498)|[body, decals, z19vf99lk, le, 702498]|
| ABC DEF ABC |[abc, def] |
+------------------------------------+-------------------------------------+
我不太清楚确切的问题,但这应该能让你朝着正确的方向前进。
检查以下是否适合您。我假设 df
是 groupby 之后的数据帧,collect_set 你已经 运行:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import expr
任务 1:使用 RegexTokenizer
使用模式(?:\p{Punct}|\s)+
拆分字符串,将结果保存到temp1
列。生成的字符串数组将所有项目都小写,leading/trailing 空格也被删除。
tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol='fits_assembly_name', outputCol='temp1')
df1 = tk.transform(df)
任务 2:使用 StopWordsRemover
删除停用词并将结果保存到 temp2
列:
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2')
df2 = sw.transform(df1)
您可以通过键入 sw.getStopWords()
检查所有当前停用词,检查 loadDefaultStopWords(language) 以切换到另一种语言设置,或通过以下方式附加您自己的停用词:
mylist = sw.getStopWords() + ['my', 'black', 'list']
# then adjust the transformer to the following
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2', stopWords=mylist)
此时,您应该有一个字符串数组列 temp2
,其中删除了停用词。
任务 3:
使用array_distinct()删除重复项,concat_ws()将数组转换为字符串,然后删除两个临时列:
df_new = df2.withColumn('fits_assembly_name', expr('concat_ws(" ", array_distinct(temp2))')) \
.drop('temp1', 'temp2')
如果您对上述代码有任何问题,请告诉我。
我是数据的新手engineering/machine自学自学。在处理示例问题时,我遇到了以下数据清理任务
1. Remove extra whitespaces (keep one whitespace in between word but remove more
than one whitespaces) and punctuations
2. Turn all the words to lower case and remove stop words (list from NLTK)
3. Remove duplicate words in ASSEMBLY_NAME column
虽然我在大学作业期间致力于编写代码来执行这些任务,但我从来没有在一段代码中完成过(对于任何项目),我在这里寻求专家的指导可以通过指出完成它的最佳方法来帮助我 (in python or scala)
目前完成的工作:
1.从拼花文件中读取数据
partFitmentDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")
display(partFitmentDF)
partFitmentDF.createOrReplaceTempView("partsFits")
partFitmentDF.write.mode("overwrite").format("delta").saveAsTable("partsFitsTable")
3。重新排列 table 中 fits_assembly_name 的数据,以便所有 fits_assembly_name 和 fits_assembly_id 滚动到每个不同项目的单行
%sql
select itemno, concat_ws(' | ' , collect_set(cast(fits_assembly_id as int))) as fits_assembly_id, concat_ws(' | ' ,collect_set(fits_assembly_name)) as fits_assembly_name
from partsFitsTable
WHERE itemno = 1014584
group by itemno
P.S.
来自 partFitmentDF 选定列的示例数据
itemno fits_assembly_id fits_assembly_name
0450056 44011 OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056 135502 OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056 37884 OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)
0450056 19618 OIL PUMP ASSEMBLY - A06FA09CA/CB/CC (4999202399920239A06)
0450056 135021 OIL PUMP ASSEMBLY - A02EA05CA (4999202399920239A06)
0450056 4147 OIL PUMP ASSEMBLY - A04KA05CA (4999202359920235A06)
0450056 12003 OIL PUMP ASSEMBLY - A05FA09CA/CB/CC (4999202399920239A06)
现在,我需要将这些多行按项目号 滚动到一行(所有属于一个项目号的 assembly_name 和 id 应该在一行中) 然后我需要执行最上面列出的任务#1、2 和 3 以清理 fits_assembly_name 列并将处理后的数据保存到最终数据帧或 table 与 itemno , fits_assembly_id 和 fits_assembly_name 列 但我不确定如何在 python 中开始执行此操作。您能否通过建议方法 (和代码提示) 来帮助我,以便我可以进一步完成这项任务?
您想使用 SQL 或 pyspark 函数。
- The pyspark docs
trim(s)
-- returns 字符串s
前导和尾随空格regexp_replace(s, "\W")
-- returns 字符串s
非字母数字字符 ("punctuation") 被空格替换lower(s)
-- returns 字符串s
所有字符均为小写split(s)
-- returns 字符串中的一组单词s
拆分,其中单词边界是一个或多个空格。array_distinct(a)
-- returns a 的不同元素
在 pyspark 中像这样把它们放在一起:
from pyspark.sql import *
import pyspark.sql.functions as F
df = spark.createDataFrame( [Row(name="BODY, DECALS - Z19VF99LK/LE (702498)"), Row(name=" ABC DEF ABC ")] )
clean_df = df.withColumn("words",
F.array_distinct(
F.split(
F.trim(
F.regexp_replace(F.lower(df.name), "\W", " ")
)
, " +")
)
)
clean_df.show(truncate=False)
要生成 words
列,结果为:
+------------------------------------+-------------------------------------+
|name |words |
+------------------------------------+-------------------------------------+
|BODY, DECALS - Z19VF99LK/LE (702498)|[body, decals, z19vf99lk, le, 702498]|
| ABC DEF ABC |[abc, def] |
+------------------------------------+-------------------------------------+
我不太清楚确切的问题,但这应该能让你朝着正确的方向前进。
检查以下是否适合您。我假设 df
是 groupby 之后的数据帧,collect_set 你已经 运行:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.sql.functions import expr
任务 1:使用 RegexTokenizer
使用模式(?:\p{Punct}|\s)+
拆分字符串,将结果保存到temp1
列。生成的字符串数组将所有项目都小写,leading/trailing 空格也被删除。
tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol='fits_assembly_name', outputCol='temp1')
df1 = tk.transform(df)
任务 2:使用 StopWordsRemover
删除停用词并将结果保存到 temp2
列:
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2')
df2 = sw.transform(df1)
您可以通过键入 sw.getStopWords()
检查所有当前停用词,检查 loadDefaultStopWords(language) 以切换到另一种语言设置,或通过以下方式附加您自己的停用词:
mylist = sw.getStopWords() + ['my', 'black', 'list']
# then adjust the transformer to the following
sw = StopWordsRemover(inputCol='temp1', outputCol='temp2', stopWords=mylist)
此时,您应该有一个字符串数组列 temp2
,其中删除了停用词。
任务 3:
使用array_distinct()删除重复项,concat_ws()将数组转换为字符串,然后删除两个临时列:
df_new = df2.withColumn('fits_assembly_name', expr('concat_ws(" ", array_distinct(temp2))')) \
.drop('temp1', 'temp2')
如果您对上述代码有任何问题,请告诉我。