带有数据框查询的 PySpark UDF 函数?
PySpark UDF function with data frame query?
我有另一个解决方案,但我更喜欢使用 PySpark 2.3 来实现。
我有一个像这样的二维 PySpark 数据框:
Date | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12
我想通过寻找最近的过去来替换 ID
空值,或者如果该值是空值,则通过向前看(如果它再次为空值,则设置默认值)
我曾设想用 .withColumn
添加一个新列,并使用一个 UDF 函数来查询数据框本身。
类似伪代码的东西(不完美,但它是主要思想):
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def return_value(value,date):
if value is not null:
return val
value1 = df.filter(df['date']<= date).select(df['value']).collect()
if (value1)[0][0] is not null:
return (value1)[0][0]
value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
return (value2)[0][0]
value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))
但是不行。我是不是完全做错了?是否只能在 UDF 函数中查询 Spark 数据框?我错过了更简单的解决方案吗?
创建具有一列的新数据框 - 所有日期的唯一列表:
datesDF = yourDF.select('Date').distinct()
创建另一个包含日期和 ID 的日期和 ID,但仅包含没有空值的日期和 ID。并且还让每个日期只保留第一个(无论是第一个)出现的 ID(从您的示例来看,每个日期可以有多个行)
noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')
现在让我们加入这两个,这样我们就有了所有日期的列表,无论我们有什么值(或空值)
joinedDF = datesDF.join(noNullsDF, 'Date', 'left')
现在,对于每个日期,使用 window 函数从上一个日期和下一个日期获取 ID 的值,还可以重命名我们的 ID 列,这样以后连接的问题就会减少:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')
joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w))
.withColumn('nextID',f.lead('ID').over(w))
.withColumnRenamed('ID','newID')
现在让我们按日期将其加入到我们原来的 Dataframe
yourDF = yourDF.join(joinedDF, 'Date', 'left')
现在我们的 Dataframe 有 4 个 ID 列:
- 原始ID
- newID - 给定日期的任何非空值的 ID(如果有)或 null
- previousID - 上一日期的 ID(非空如果有或空)
- nextID - 从下一个日期开始的 ID(非空如果有或空)
现在我们需要将它们按顺序组合成finalID:
- 如果不为空则为原始值
- 当前日期的值,如果存在任何非空值(这与您的问题相反,但您 pandas 代码建议您 <= 进行日期检查)如果结果不为空
- 前一个日期的值,如果它不为空
- 下一个日期的值,如果它不为空
- 一些默认值
我们只需合并即可:
default = 0
finalDF = yourDF.select('Date',
'ID',
f.coalesce('ID',
'newID',
'previousID',
'nextID',
f.lit(default)).alias('finalID')
)
我有另一个解决方案,但我更喜欢使用 PySpark 2.3 来实现。
我有一个像这样的二维 PySpark 数据框:
Date | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12
我想通过寻找最近的过去来替换 ID
空值,或者如果该值是空值,则通过向前看(如果它再次为空值,则设置默认值)
我曾设想用 .withColumn
添加一个新列,并使用一个 UDF 函数来查询数据框本身。
类似伪代码的东西(不完美,但它是主要思想):
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def return_value(value,date):
if value is not null:
return val
value1 = df.filter(df['date']<= date).select(df['value']).collect()
if (value1)[0][0] is not null:
return (value1)[0][0]
value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
return (value2)[0][0]
value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))
但是不行。我是不是完全做错了?是否只能在 UDF 函数中查询 Spark 数据框?我错过了更简单的解决方案吗?
创建具有一列的新数据框 - 所有日期的唯一列表:
datesDF = yourDF.select('Date').distinct()
创建另一个包含日期和 ID 的日期和 ID,但仅包含没有空值的日期和 ID。并且还让每个日期只保留第一个(无论是第一个)出现的 ID(从您的示例来看,每个日期可以有多个行)
noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')
现在让我们加入这两个,这样我们就有了所有日期的列表,无论我们有什么值(或空值)
joinedDF = datesDF.join(noNullsDF, 'Date', 'left')
现在,对于每个日期,使用 window 函数从上一个日期和下一个日期获取 ID 的值,还可以重命名我们的 ID 列,这样以后连接的问题就会减少:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')
joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w))
.withColumn('nextID',f.lead('ID').over(w))
.withColumnRenamed('ID','newID')
现在让我们按日期将其加入到我们原来的 Dataframe
yourDF = yourDF.join(joinedDF, 'Date', 'left')
现在我们的 Dataframe 有 4 个 ID 列:
- 原始ID
- newID - 给定日期的任何非空值的 ID(如果有)或 null
- previousID - 上一日期的 ID(非空如果有或空)
- nextID - 从下一个日期开始的 ID(非空如果有或空)
现在我们需要将它们按顺序组合成finalID:
- 如果不为空则为原始值
- 当前日期的值,如果存在任何非空值(这与您的问题相反,但您 pandas 代码建议您 <= 进行日期检查)如果结果不为空
- 前一个日期的值,如果它不为空
- 下一个日期的值,如果它不为空
- 一些默认值
我们只需合并即可:
default = 0
finalDF = yourDF.select('Date',
'ID',
f.coalesce('ID',
'newID',
'previousID',
'nextID',
f.lit(default)).alias('finalID')
)