PySpark:根据另一个数据框的日期范围过滤数据
PySpark: Filter data based on a date range from another data frame
如果 df1.date1
位于 df2.date2
和 df2.date3
之间(只有三个范围),我正在尝试 select 来自 df1 的记录允许 date2、date3 组合,按行)。
在我的例子中,没有通用变量来建立 'join' 标准。我尝试了不同的 pyspark.sql 函数,例如 'filter'、'when'、'withColumn'、'date_sub'、'date_add' 等,但无法找到解决方案。
我确实看过几个 SO 帖子,但是,他们中的大多数建议使用 'join',这可能不适合我的问题!
df1
+----------+-----------+
| emp_id | date1 |
+----------+-----------+
| 67891 | 11-13-2015|
| 12345 | 02-28-2017|
| 34567 | 04-07-2017|
+----------+-----------+
df2
+------------+------------+
| date2 | date3 |
+------------+------------+
|01-28-2017 | 03-15-2017 |
|07-13-2017 | 11-13-2017 |
|06-07-2018 | 09-07-2018 |
+------------+------------+
预期记录:
+----------+-----------+
| emp_id | date1 |
+----------+-----------+
| 12345 | 02-28-2017|
+----------+-----------+
您可以在 spark 中进行非相等连接。您不一定需要匹配的密钥。
这是在 scala 中,我很确定它在 python 中几乎相同。让我知道它是否不起作用。也会更新 pyspark 中的答案。
scala> df1.join(df2 , 'date1 > 'date2 && 'date1 < 'date3).show
+------+----------+----------+----------+
|emp_id| date1| date2| date3|
+------+----------+----------+----------+
| 12345|02-28-2017|01-28-2017|03-15-2017|
+------+----------+----------+----------+
Pyspark 解决方案:
>>> from pyspark.sql.functions import unix_timestamp
>>> from pyspark.sql.functions import from_unixtime
>>> x = [(67891 ,'11-13-2015'),(12345, '02-28-2017'),(34567,'04-07-2017')]
>>> df1 = spark.createDataFrame(x,['emp_id','date1'])
>>> y = [('01-28-2017','03-15-2017'),('07-13-2017','11-13-2017'),('06-07-2018','09-07-2018')]
>>> df2 = spark.createDataFrame(y,['date2','date3'])
>>> df1a = df1.select('emp_id', from_unixtime(unix_timestamp('date1', 'MM-dd-yyy')).alias('date1'))
>>> df2a = df2.select(from_unixtime(unix_timestamp('date2', 'MM-dd-yyy')).alias('date2'),from_unixtime(unix_timestamp('date3', 'MM-dd-yyy')).alias('date3'))
>>> df1a.join(df2a, on=[df1a['date1'] > df2a['date2'], df1a['date1'] < df2a['date3']]).show()
+------+-------------------+-------------------+-------------------+
|emp_id| date1| date2| date3|
+------+-------------------+-------------------+-------------------+
| 12345|2017-02-28 00:00:00|2017-01-28 00:00:00|2017-03-15 00:00:00|
+------+-------------------+-------------------+-------------------+
如果 df1.date1
位于 df2.date2
和 df2.date3
之间(只有三个范围),我正在尝试 select 来自 df1 的记录允许 date2、date3 组合,按行)。
在我的例子中,没有通用变量来建立 'join' 标准。我尝试了不同的 pyspark.sql 函数,例如 'filter'、'when'、'withColumn'、'date_sub'、'date_add' 等,但无法找到解决方案。
我确实看过几个 SO 帖子,但是,他们中的大多数建议使用 'join',这可能不适合我的问题!
df1
+----------+-----------+
| emp_id | date1 |
+----------+-----------+
| 67891 | 11-13-2015|
| 12345 | 02-28-2017|
| 34567 | 04-07-2017|
+----------+-----------+
df2
+------------+------------+
| date2 | date3 |
+------------+------------+
|01-28-2017 | 03-15-2017 |
|07-13-2017 | 11-13-2017 |
|06-07-2018 | 09-07-2018 |
+------------+------------+
预期记录:
+----------+-----------+
| emp_id | date1 |
+----------+-----------+
| 12345 | 02-28-2017|
+----------+-----------+
您可以在 spark 中进行非相等连接。您不一定需要匹配的密钥。
这是在 scala 中,我很确定它在 python 中几乎相同。让我知道它是否不起作用。也会更新 pyspark 中的答案。
scala> df1.join(df2 , 'date1 > 'date2 && 'date1 < 'date3).show
+------+----------+----------+----------+
|emp_id| date1| date2| date3|
+------+----------+----------+----------+
| 12345|02-28-2017|01-28-2017|03-15-2017|
+------+----------+----------+----------+
Pyspark 解决方案:
>>> from pyspark.sql.functions import unix_timestamp
>>> from pyspark.sql.functions import from_unixtime
>>> x = [(67891 ,'11-13-2015'),(12345, '02-28-2017'),(34567,'04-07-2017')]
>>> df1 = spark.createDataFrame(x,['emp_id','date1'])
>>> y = [('01-28-2017','03-15-2017'),('07-13-2017','11-13-2017'),('06-07-2018','09-07-2018')]
>>> df2 = spark.createDataFrame(y,['date2','date3'])
>>> df1a = df1.select('emp_id', from_unixtime(unix_timestamp('date1', 'MM-dd-yyy')).alias('date1'))
>>> df2a = df2.select(from_unixtime(unix_timestamp('date2', 'MM-dd-yyy')).alias('date2'),from_unixtime(unix_timestamp('date3', 'MM-dd-yyy')).alias('date3'))
>>> df1a.join(df2a, on=[df1a['date1'] > df2a['date2'], df1a['date1'] < df2a['date3']]).show()
+------+-------------------+-------------------+-------------------+
|emp_id| date1| date2| date3|
+------+-------------------+-------------------+-------------------+
| 12345|2017-02-28 00:00:00|2017-01-28 00:00:00|2017-03-15 00:00:00|
+------+-------------------+-------------------+-------------------+