使用条件返回 PySpark DataFrame 的行
Returning Rows of PySpark DataFrame by using conditions
我有两个不同长度的数据帧。第一个看起来像这样
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
Yellow | 2016-10-03 00:11:15 | 80
Blue | 2016-10-04 00:12:15 | 6
第二个看起来像这样
D | Time_Start | Z
------ | ----------------------|-----
Foo | 2016-10-01 00:12:15 | 7
Cookie | 2016-10-03 00:45:15 | 99
我的目标是 return 只有第一个数据帧中在特定时间限制内(比如 5 分钟内)的行,因此输出帧应该看起来像这样
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
我无法弄清楚这一点。到目前为止,我已经尝试这样做
from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd' 'HH:mm:ss"
result = df.where(F.unix_timestamp(df1.Time_Start, format = timeFmt) - F.unix_timestamp(df.Time_Stop, format = timeFmt) <= 300)
但是这不起作用。我怎样才能达到我想要的结果?
编辑:我忘了提到两个 DataFrame 的时间列都是字符串格式。
编辑 2:我已尝试以下操作,但收到错误消息。
from pyspark.sql.functions import expo
df2 = df2.withColumn("Time_Start", df2["Time_Start"].cast("timestamp"))
df = df.withColumn("Time_Stop", df['Time_Stop'].cast('timestamp'))
condition = df.Time_Stop + expr("INTERVAL 10 MINUTES") <= df2.Time_Start
df.filter(condition).show()
AnalysisException: u'resolved attribute(s) starttime#2251 missing from pickup_time#1964,dropoff_latitude#2090,tip#2180,dropoff_longitude#2072,pickup_latitude#2036,pickup_longitude#2018,payment_type#2108,dropoff_time#2268,mta_tax#2162,trip_distance#2000,fare_amount#2126,toll#2198,rate_code#2054,total#2216,row#1946,surcharge#2144 in operator !Filter (cast(dropoff_time#2268 + interval 10 minutes as timestamp) <= starttime#2251);'
编辑 3:我能够使用我的本地计算机完成此操作,但我认为当我将其传输到集群上的 运行 时,我的代码无法很好地转换。这是我的代码,也许有人可以指出使它 运行 更快或看起来更干净的方法。我仍然让这个问题悬而未决。
df = list(df.toLocalIterator())
df1 = list(df1.toLocalIterator())
rand = []
for i in df:
for j in df1:
elapsed_time = (i['Time_Start'] - j['Time_Stop']).total_seconds()
time_limit = 600
if (abs(elapsed_time) <= time_limit):
rand.append(j)
rand = list(set(rand))
将 toLocalIterator()
与 list()
一起使用(其工作方式与 collect()
完全相同)并且循环在大数据集上的效率非常低(它根本不使用 spark 功能)。
笛卡尔连接似乎是这种情况下的最佳解决方案。让我们用 Time_Stop
和 firstDF
调用 DF,用 Time_Start
调用 DF:secondDF
,两者都将日期转换为时间戳。
然后尝试以下操作:
from pyspark.sql import functions as F
interval = F.unix_timestamp(secondDF.Time_Start) - F.unix_timestamp(firstDF.Time_Stop)
firstDF.join(secondDF).where(F.abs(interval) < 300).select('A', 'Time_Stop', 'B')
我有两个不同长度的数据帧。第一个看起来像这样
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
Yellow | 2016-10-03 00:11:15 | 80
Blue | 2016-10-04 00:12:15 | 6
第二个看起来像这样
D | Time_Start | Z
------ | ----------------------|-----
Foo | 2016-10-01 00:12:15 | 7
Cookie | 2016-10-03 00:45:15 | 99
我的目标是 return 只有第一个数据帧中在特定时间限制内(比如 5 分钟内)的行,因此输出帧应该看起来像这样
A | Time_Stop | B
------ | ----------------------|-----
Green | 2016-10-01 00:10:15 | 77
我无法弄清楚这一点。到目前为止,我已经尝试这样做
from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd' 'HH:mm:ss"
result = df.where(F.unix_timestamp(df1.Time_Start, format = timeFmt) - F.unix_timestamp(df.Time_Stop, format = timeFmt) <= 300)
但是这不起作用。我怎样才能达到我想要的结果?
编辑:我忘了提到两个 DataFrame 的时间列都是字符串格式。
编辑 2:我已尝试以下操作,但收到错误消息。
from pyspark.sql.functions import expo
df2 = df2.withColumn("Time_Start", df2["Time_Start"].cast("timestamp"))
df = df.withColumn("Time_Stop", df['Time_Stop'].cast('timestamp'))
condition = df.Time_Stop + expr("INTERVAL 10 MINUTES") <= df2.Time_Start
df.filter(condition).show()
AnalysisException: u'resolved attribute(s) starttime#2251 missing from pickup_time#1964,dropoff_latitude#2090,tip#2180,dropoff_longitude#2072,pickup_latitude#2036,pickup_longitude#2018,payment_type#2108,dropoff_time#2268,mta_tax#2162,trip_distance#2000,fare_amount#2126,toll#2198,rate_code#2054,total#2216,row#1946,surcharge#2144 in operator !Filter (cast(dropoff_time#2268 + interval 10 minutes as timestamp) <= starttime#2251);'
编辑 3:我能够使用我的本地计算机完成此操作,但我认为当我将其传输到集群上的 运行 时,我的代码无法很好地转换。这是我的代码,也许有人可以指出使它 运行 更快或看起来更干净的方法。我仍然让这个问题悬而未决。
df = list(df.toLocalIterator())
df1 = list(df1.toLocalIterator())
rand = []
for i in df:
for j in df1:
elapsed_time = (i['Time_Start'] - j['Time_Stop']).total_seconds()
time_limit = 600
if (abs(elapsed_time) <= time_limit):
rand.append(j)
rand = list(set(rand))
将 toLocalIterator()
与 list()
一起使用(其工作方式与 collect()
完全相同)并且循环在大数据集上的效率非常低(它根本不使用 spark 功能)。
笛卡尔连接似乎是这种情况下的最佳解决方案。让我们用 Time_Stop
和 firstDF
调用 DF,用 Time_Start
调用 DF:secondDF
,两者都将日期转换为时间戳。
然后尝试以下操作:
from pyspark.sql import functions as F
interval = F.unix_timestamp(secondDF.Time_Start) - F.unix_timestamp(firstDF.Time_Stop)
firstDF.join(secondDF).where(F.abs(interval) < 300).select('A', 'Time_Stop', 'B')