PySpark 按最接近的时间值连接两个数据帧
PySpark joining two data frames by closest time value
我有两个数据框(tx_df 和 login_df)。
第一个有列 player_id、tx_id 和 tx_time,而第二个有 player_id 和 login_time。
我想做的是使用 player_id 列连接这两个数据框,但除此之外,只连接来自 login_df 的最新登录行。
例如,如果有这样的tx_df:
pid_1, txid_1, '2016-11-16 00:01:00'
pid_1, txid_2, '2016-11-16 00:01:02'
pid_1, txid_3, '2016-11-16 00:02:15'
pid_1, txid_4, '2016-11-16 00:02:16'
pid_1, txid_5, '2016-11-16 00:02:17'
和login_df像这样:
pid_1, '2016-11-16 00:02:10'
pid_1, '2016-11-16 00:00:55'
pid_1, '2016-11-13 00:03:00'
pid_1, '2016-11-10 16:30:00'
我希望生成的数据框如下所示:
pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55'
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55'
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10'
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10'
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10'
我不是强制绑定到数据帧的,所以如果能提示如何使用 RDD 或任何其他方法很好地做到这一点,我们将不胜感激。
我担心的是数据爆炸,因为 tx_df 每个玩家 ID 可能有数千个交易条目(然后是数千个玩家 ID),而 login_df 也可能有未知数量的玩家登录信息。简单地在 player_id 上连接这两个将创建一个巨大的数据框,因为笛卡尔积是不可接受的。
注意:我正在为 Spark 使用 Python API。
为了将来参考,我设法用稍微不同的方法解决了这个问题。
我很幸运,第二个数据帧足够小,可以播放它。更准确地说,我广播了值的哈希图,但这只是因为我发现它非常适合这个目的。 (参见:broadcast variables in Spark)
然后,我像这样遍历第一个数据框的行
tx_df.rdd.map(my_map_function)
并在 my_map_function 中,我访问了广播的 hasmap,进行了排序和其他操作,最后选择了我想将哪些值附加到第一个数据帧的行。
作为广播值散列图的一个很好的副作用,我能够删除数据帧的连接并加快速度。
在这样做之前,脚本有
- 正在将数据加载到数据帧中
- 将数据帧合并成一个大数据帧
- 过滤掉大数据框不需要的行
这个广播解决方案后,脚本有
- 正在将数据加载到数据帧中
- 第二个的广播值
- 只迭代第一个,直接访问第二个的值并将它们附加到当前行
第二种方法不需要过滤,因为已经选择了正确的值,因此脚本执行速度更快。
我有两个数据框(tx_df 和 login_df)。 第一个有列 player_id、tx_id 和 tx_time,而第二个有 player_id 和 login_time。
我想做的是使用 player_id 列连接这两个数据框,但除此之外,只连接来自 login_df 的最新登录行。 例如,如果有这样的tx_df:
pid_1, txid_1, '2016-11-16 00:01:00'
pid_1, txid_2, '2016-11-16 00:01:02'
pid_1, txid_3, '2016-11-16 00:02:15'
pid_1, txid_4, '2016-11-16 00:02:16'
pid_1, txid_5, '2016-11-16 00:02:17'
和login_df像这样:
pid_1, '2016-11-16 00:02:10'
pid_1, '2016-11-16 00:00:55'
pid_1, '2016-11-13 00:03:00'
pid_1, '2016-11-10 16:30:00'
我希望生成的数据框如下所示:
pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55'
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55'
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10'
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10'
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10'
我不是强制绑定到数据帧的,所以如果能提示如何使用 RDD 或任何其他方法很好地做到这一点,我们将不胜感激。
我担心的是数据爆炸,因为 tx_df 每个玩家 ID 可能有数千个交易条目(然后是数千个玩家 ID),而 login_df 也可能有未知数量的玩家登录信息。简单地在 player_id 上连接这两个将创建一个巨大的数据框,因为笛卡尔积是不可接受的。
注意:我正在为 Spark 使用 Python API。
为了将来参考,我设法用稍微不同的方法解决了这个问题。 我很幸运,第二个数据帧足够小,可以播放它。更准确地说,我广播了值的哈希图,但这只是因为我发现它非常适合这个目的。 (参见:broadcast variables in Spark)
然后,我像这样遍历第一个数据框的行
tx_df.rdd.map(my_map_function)
并在 my_map_function 中,我访问了广播的 hasmap,进行了排序和其他操作,最后选择了我想将哪些值附加到第一个数据帧的行。
作为广播值散列图的一个很好的副作用,我能够删除数据帧的连接并加快速度。 在这样做之前,脚本有
- 正在将数据加载到数据帧中
- 将数据帧合并成一个大数据帧
- 过滤掉大数据框不需要的行
这个广播解决方案后,脚本有
- 正在将数据加载到数据帧中
- 第二个的广播值
- 只迭代第一个,直接访问第二个的值并将它们附加到当前行
第二种方法不需要过滤,因为已经选择了正确的值,因此脚本执行速度更快。