过滤 Spark 数据框

Filtering Spark Dataframe

我创建了一个数据框:

ratings = imdb_data.sort('imdbRating').select('imdbRating').filter('imdbRating is NOT NULL')

在执行如下所示的 ratings.show() 后,我可以看到 imdbRating 字段具有混合类型的数据,例如随机字符串、电影标题、电影 url 和实际评级。所以脏数据看起来是这样的:

+--------------------+
|          imdbRating|
+--------------------+
|Mary (TV Episode...|
| Paranormal Activ...|
| Sons (TV Episode...|
|        Spion (2011)|
| Winter... und Fr...|
| and Gays (TV Epi...|
| grAs - Die Serie...|
| hat die Wahl (2000)|
|                 1.0|
|                 1.3|
|                 1.4|
|                 1.5|
|                 1.5|
|                 1.5|
|                 1.6|
|                 1.6|
|                 1.7|
|                 1.9|
|                 1.9|
|                 1.9|
+--------------------+
only showing top 20 rows

有没有什么方法可以过滤掉不需要的字符串,只得到收视率?我尝试将 UDF 用作:

 ratings_udf = udf(lambda imdbRating: imdbRating if isinstance(imdbRating, float)  else None)

并尝试将其命名为:

ratings = imdb_data.sort('imdbRating').select('imdbRating')
filtered = rating.withColumn('imdbRating',ratings_udf(ratings.imdbRating))

上面的问题是,因为它尝试在每一行上调用 udf,数据帧的每一行都映射到一个 Row 类型,因此在所有值上返回 None

有什么简单的方法可以过滤掉这些数据吗? 任何帮助都感激不尽。谢谢

最后,我能够解决 it.The 问题,即存在一些损坏的数据,但并非所有字段都存在。首先,我尝试通过读取 pandas 中的 csv 文件来使用 pandas 作为:

pd_frame = pd.read_csv('imdb.csv', error_bad_lines=False)

这 skipped/dropped 列数少于实际列数的损坏行。我试图阅读上面的熊猫数据框,pd_frame,以激发使用:

imdb_data= spark.createDataFrame(pd_frame)

但在推断模式时由于不匹配而出现一些错误。原来 spark csv reader 有类似的东西,它会删除损坏的行:

imdb_data = spark.read.csv('imdb.csv', header='true', mode='DROPMALFORMED')