在 python 中按时加入两个 spark 数据帧 (TimestampType)
Joining two spark dataframes on time (TimestampType) in python
我有两个数据框,我想基于一列加入它们,需要注意的是这一列是时间戳,并且该时间戳必须在一定的偏移量(5 秒)内才能加入记录.更具体地说,dates_df
和 date=1/3/2015:00:00:00
中的记录应该与 events_df
和 time=1/3/2015:00:00:01
连接,因为两个时间戳彼此相差 5 秒以内。
我试图让这个逻辑与 python spark 一起工作,这非常痛苦。人们如何在 spark 中进行这样的连接?
我的方法是向 dates_df
添加两个额外的列,它们将确定 lower_timestamp
和 upper_timestamp
具有 5 秒偏移量的边界,并执行条件连接。这就是它失败的地方,更具体地说:
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.explain()
仅捕获查询的最后部分:
Filter (time#6 < upper_timestamp#4)
CartesianProduct
....
它给了我一个错误的结果。
我真的必须对每个不等式进行完整的笛卡尔连接,同时删除重复项吗?
完整代码如下:
from datetime import datetime, timedelta
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
master = 'local[*]'
app_name = 'Whosebug_join'
conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def lower_range_func(x, offset=5):
return x - timedelta(seconds=offset)
def upper_range_func(x, offset=5):
return x + timedelta(seconds=offset)
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())
dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)
dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)
dates_df.show()
# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
withColumn('upper_timestamp', upper_range(dates_df['date']))
event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)
events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)
events_df.show()
# finally, join the data
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.show()
我得到以下输出:
+-----+--------------------+
| name| date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+
+--------------------+-------+
| time| event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+
+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name| date| lower_timestamp| upper_timestamp| time| event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
我用 explain()
进行了 spark SQL 查询以查看它是如何完成的,并在 python 中复制了相同的行为。首先是如何使用 SQL spark:
dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and events.time < dates.upper_timestamp")
results.explain()
这行得通,但问题是如何在 python 中做到这一点,因此解决方案似乎只是一个普通的连接,然后是两个过滤器:
joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)
joined_df.explain()
产生与 sql spark results.explain()
相同的查询,所以我假设事情是这样完成的。
虽然晚了一年,但也许能帮到别人..
如您所说,完整的笛卡尔积在您的情况下是疯狂的。您的匹配记录将在时间上接近(5 分钟),因此您可以利用这一点并节省大量时间,如果您首先根据时间戳将记录分组到桶中,然后加入该桶上的两个数据帧,然后才应用过滤器。使用该方法会导致 Spark 使用 SortMergeJoin 而不是 CartesianProduct,从而大大提高性能。
这里有一个小警告 - 您必须同时匹配桶和下一个桶。
最好在我的博客中通过工作代码示例进行解释(Scala + Spark 2.0,但您也可以在 python 中实现相同的...)
http://zachmoshe.com/2016/09/26/efficient-range-joins-with-spark.html
我有两个数据框,我想基于一列加入它们,需要注意的是这一列是时间戳,并且该时间戳必须在一定的偏移量(5 秒)内才能加入记录.更具体地说,dates_df
和 date=1/3/2015:00:00:00
中的记录应该与 events_df
和 time=1/3/2015:00:00:01
连接,因为两个时间戳彼此相差 5 秒以内。
我试图让这个逻辑与 python spark 一起工作,这非常痛苦。人们如何在 spark 中进行这样的连接?
我的方法是向 dates_df
添加两个额外的列,它们将确定 lower_timestamp
和 upper_timestamp
具有 5 秒偏移量的边界,并执行条件连接。这就是它失败的地方,更具体地说:
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.explain()
仅捕获查询的最后部分:
Filter (time#6 < upper_timestamp#4)
CartesianProduct
....
它给了我一个错误的结果。
我真的必须对每个不等式进行完整的笛卡尔连接,同时删除重复项吗?
完整代码如下:
from datetime import datetime, timedelta
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
master = 'local[*]'
app_name = 'Whosebug_join'
conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def lower_range_func(x, offset=5):
return x - timedelta(seconds=offset)
def upper_range_func(x, offset=5):
return x + timedelta(seconds=offset)
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())
dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)
dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)
dates_df.show()
# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
withColumn('upper_timestamp', upper_range(dates_df['date']))
event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)
events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)
events_df.show()
# finally, join the data
joined_df = dates_df.join(events_df,
dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)
joined_df.show()
我得到以下输出:
+-----+--------------------+
| name| date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+
+--------------------+-------+
| time| event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+
+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name| date| lower_timestamp| upper_timestamp| time| event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
我用 explain()
进行了 spark SQL 查询以查看它是如何完成的,并在 python 中复制了相同的行为。首先是如何使用 SQL spark:
dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and events.time < dates.upper_timestamp")
results.explain()
这行得通,但问题是如何在 python 中做到这一点,因此解决方案似乎只是一个普通的连接,然后是两个过滤器:
joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)
joined_df.explain()
产生与 sql spark results.explain()
相同的查询,所以我假设事情是这样完成的。
虽然晚了一年,但也许能帮到别人..
如您所说,完整的笛卡尔积在您的情况下是疯狂的。您的匹配记录将在时间上接近(5 分钟),因此您可以利用这一点并节省大量时间,如果您首先根据时间戳将记录分组到桶中,然后加入该桶上的两个数据帧,然后才应用过滤器。使用该方法会导致 Spark 使用 SortMergeJoin 而不是 CartesianProduct,从而大大提高性能。
这里有一个小警告 - 您必须同时匹配桶和下一个桶。
最好在我的博客中通过工作代码示例进行解释(Scala + Spark 2.0,但您也可以在 python 中实现相同的...)
http://zachmoshe.com/2016/09/26/efficient-range-joins-with-spark.html