如何在 Pyspark spark.sql 数据框中同质化数据

How to homogonize data in a Pyspark spark.sql dataframe

我下载了一个 1.9 GB 的 csv 文件,其中包含爱彼迎数据。尽管所有列的数据类型都是 "string",但我有一些列不是 "homogenous",例如 "Amenities" 的列,其中一些条目具有便利设施计数特别是 属性,其他人有一份便利设施清单。全部为字符串格式。

所以,这是我目前的情况:

from pyspark import SparkContext, SparkConf
import pandas as pd
import numpy as np
conf = SparkConf().setAppName("app")
sc = SparkContext(conf=conf)

from pyspark.sql import SQLContext
SQLCtx = SQLContext(sc)

air =SQLCtx.read.load('/home/john/Downloads/airbnb-listings.csv',
                             format = "com.databricks.spark.csv",
                             header = "true",
                             sep = ";",
                             inferSchema = "true")

#check for missing values
from pyspark.sql.functions import col,sum
air.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in air.columns)).show()

所以在删除几列然后删除缺失值之后,我有这个:

Keep = ['Price', 'Bathrooms', 'Bedrooms', 'Beds', 'Bed Type', 'Amenities',
       'Security Deposit', 'Cleaning Fee', 'Guests Included', 'Extra People',
       'Review Scores Rating', 'Cancellation Policy','Host Response Rate', 
       'Country Code', 'Zipcode']

data = air.select(*Keep)
reduced2 = data.na.drop()

#final shape after dropping missing values.
print((reduced2.count(), len(reduced2.columns)))

我可以将几行转换成一个 pandas 数据框:

df3 = pd.DataFrame(reduced2.take(50), columns = reduced2.columns)

"Amenities" 列表中的一小部分:

Wireless Internet,Air conditioning,Kitchen,Fre...
2                                                    10
3     Internet,Wireless Internet,Air conditioning,Ki...
4     TV,Cable TV,Internet,Wireless Internet,Air con...
5     TV,Wireless Internet,Air conditioning,Pool,Kit...
6     TV,Wireless Internet,Air conditioning,Pool,Kit...
7     Internet,Wireless Internet,Kitchen,Free parkin...
8     TV,Wireless Internet,Air conditioning,Pool,Kit...
9     Wireless Internet,Air conditioning,Kitchen,Fre...
10    TV,Cable TV,Internet,Wireless Internet,Air con...
14                                                   10
16                                                   10
17    TV,Internet,Wireless Internet,Air conditioning...
18    TV,Cable TV,Internet,Wireless Internet,Air con...
19    TV,Internet,Wireless Internet,Air conditioning...
20    TV,Wireless Internet,Air conditioning,Pool,Kit...
23    TV,Cable TV,Internet,Wireless Internet,Air con...
28                                                    9
33                                                   10
34    Internet,Wireless Internet,Kitchen,Elevator in...
37                                                   10

如您所见,我将无法按原样处理。 我可以在常规 pandas 中轻松地做一些事情来修复它,就像这样:

for i in range(len(df3['Amenities'])):
    if len(df3["Amenities"][i])>2:
        df3['Amenities'][i] = str(len(df3['Amenities'][i].split(',')))

现在我意识到这可能不是最好的方法,但它会将列表中的所有内容都变成数字。 我需要的是一种对 pyspark SQL 数据框中的列执行类似操作的方法,如果可能的话。

谢谢!

我不熟悉 PySpark SQL Dataframes,只熟悉 vanilla Pandas。

不确定您的任务是什么,但可以考虑将该栏分为两栏。例如。 (假设这在 PySpark 中是可能的):

df['Amenities_count'] = pd.to_numeric(df['Amenities'], errors='coerce')
mask_entries_with_list = df['Amenities_count'].isna()
mask_entries_with_number = ~mask_entries_with_list
df.loc[mask_entries_with_number, 'Amenities'] = []
df.loc[mask_entries_with_list, 'Amenities_count'] = df['Amenities'].apply(len)

(未经测试)

如果我理解正确,您想要计算由 , 分隔的项目数,但保留已经是数字的行。如果是这样,您可以尝试以下操作:

from pyspark.sql import functions as F

df.withColumn('Amenities'
    , F.when(df.Amenities.rlike('^\d+$'), df.Amenities) \
       .otherwise(F.size(F.split('Amenities', ","))) \
       .astype("string")
).show()  

因此,如果 Amenities 列是整数 df.Amenities.rlike('^\d+$'),我们将保持原样 df.Amenities,否则,使用 F.size()F.split() 来计算项目。然后将结果转换为 "string"