Pyspark - 在 groupby 和 orderBy 之后选择列中的不同值
Pyspark - Selecting Distinct Values in Column after groupby and orderBy
所以我的 table 看起来像这样:
+-------------------+-------+----------+------------+
| trip_id|line_id| ef_ar_ts| station|
+-------------------+-------+----------+------------+
|80:06____:17401:000| 17401| 0|Schaffhausen|
|80:06____:17402:000| 17402|1505278458|Schaffhausen|
|80:06____:17403:000| 17403| 0|Schaffhausen|
|80:06____:17406:000| 17406|1505282110|Schaffhausen|
|80:06____:17409:000| 17409| 0|Schaffhausen|
|80:06____:17410:000| 17410|1505285757|Schaffhausen|
|80:06____:17411:000| 17411| 0|Schaffhausen|
|80:06____:17416:000| 17416|1505292890|Schaffhausen|
|80:06____:17417:000| 17417| 0|Schaffhausen|
|80:06____:17418:000| 17418|1505296501|Schaffhausen|
|80:06____:17419:000| 17419| 0|Schaffhausen|
|80:06____:17420:000| 17420|1505300253|Schaffhausen|
|80:06____:17421:000| 17421| 0|Schaffhausen|
|80:06____:17422:000| 17422|1505303814|Schaffhausen|
|80:06____:17423:000| 17423| 0|Schaffhausen|
|80:06____:17425:000| 17425| 0|Schaffhausen|
|80:06____:17426:000| 17426|1505307355|Schaffhausen|
|80:06____:17427:000| 17427| 0|Schaffhausen|
|80:06____:17428:000| 17428|1505310983|Schaffhausen|
|80:06____:17429:000| 17429| 0|Schaffhausen|
+-------------------+-------+----------+------------+
这是一个火车数据集,我要做的是:
Groupby 火车的 line_id,这样我就有了我所有的 station 和他们的线路;在每个组中按 (ef_ar_ts) 排序;然后按顺序获取 station 的 SET:每个 line_id 一个列表。这样,我将订购我的车站并重建整条线路。
到目前为止我尝试的是:
from pyspark.sql.functions import udf
@functions.udf
def keepline(df):
"""Keep lines splitted;"""
firstline = data1.first().trip_id
dftemp = df.where(data1.trip_id==firstline)
data1 = data1.fillna({'ef_ar_ts':0})
dftemp = dftemp.orderBy('ef_ar_ts')
return mylist
data2 = data1.select('*').groupby(data1.line_id).agg(udfmyfunc)
有什么帮助吗?比提前!
我们可以groupby line_id ,收集ef_ar_ts & station 列并使用 UDF.Hope 对两个集合进行排序,这有帮助。
由于你的dataframe在station中有相同的值,我添加了两行dummystation作为参考,
+-------------------+-------+----------+-------------+
| trip_id|line_id| ef_ar_ts| station|
+-------------------+-------+----------+-------------+
|80:06____:17401:000| 17401| 0| Schaffhausen|
|80:06____:17402:000| 17402|1505278458| Schaffhausen|
|80:06____:17403:000| 17403| 0| Schaffhausen|
......................................................
......................................................
|80:06____:17427:000| 17427| 0| Schaffhausen|
|80:06____:17428:000| 17428|1505310983| Schaffhausen|
|80:06____:17429:000| 17429| 0| Schaffhausen|
|80:06____:17429:000| 17401|1505278478|dummystation1|
|80:06____:17429:000| 17429|1505307355|dummystation2|
+-------------------+-------+----------+-------------+
## group and collect for each line id ##
df1 = df.groupby('line_id').agg(F.collect_list('ef_ar_ts').alias('ef_ar_ts'),F.collect_list('station').alias('station'))
+-------+---------------+-----------------------------+
|line_id|ef_ar_ts |station |
+-------+---------------+-----------------------------+
|17419 |[0] |[Schaffhausen] |
|17420 |[1505300253] |[Schaffhausen] |
|17403 |[0] |[Schaffhausen] |
|17406 |[1505282110] |[Schaffhausen] |
|17428 |[1505310983] |[Schaffhausen] |
|17421 |[0] |[Schaffhausen] |
|17427 |[0] |[Schaffhausen] |
|17411 |[0] |[Schaffhausen] |
|17416 |[1505292890] |[Schaffhausen] |
|17429 |[0, 1505307355]|[Schaffhausen, dummystation2]|
|17401 |[0, 1505278478]|[Schaffhausen, dummystation1]|
|17423 |[0] |[Schaffhausen] |
|17417 |[0] |[Schaffhausen] |
|17402 |[1505278458] |[Schaffhausen] |
|17418 |[1505296501] |[Schaffhausen] |
|17425 |[0] |[Schaffhausen] |
|17409 |[0] |[Schaffhausen] |
|17422 |[1505303814] |[Schaffhausen] |
|17426 |[1505307355] |[Schaffhausen] |
|17410 |[1505285757] |[Schaffhausen] |
+-------+---------------+-----------------------------+
## an UDF for merge both collections and sort them ##
from operator import itemgetter
udf1 = F.udf(lambda x,y : [st[1] for st in sorted(zip(x,y),key=itemgetter(0))])
df1.select('line_id',udf1('ef_ar_ts','station').alias('stations')).show(truncate=False)
+-------+-----------------------------+
|line_id|stations |
+-------+-----------------------------+
|17419 |[Schaffhausen] |
|17420 |[Schaffhausen] |
|17403 |[Schaffhausen] |
|17406 |[Schaffhausen] |
|17428 |[Schaffhausen] |
|17421 |[Schaffhausen] |
|17427 |[Schaffhausen] |
|17411 |[Schaffhausen] |
|17416 |[Schaffhausen] |
|17429 |[Schaffhausen, dummystation2]|
|17401 |[Schaffhausen, dummystation1]|
|17423 |[Schaffhausen] |
|17417 |[Schaffhausen] |
|17402 |[Schaffhausen] |
|17418 |[Schaffhausen] |
|17425 |[Schaffhausen] |
|17409 |[Schaffhausen] |
|17422 |[Schaffhausen] |
|17426 |[Schaffhausen] |
|17410 |[Schaffhausen] |
+-------+-----------------------------+
所以我的 table 看起来像这样:
+-------------------+-------+----------+------------+
| trip_id|line_id| ef_ar_ts| station|
+-------------------+-------+----------+------------+
|80:06____:17401:000| 17401| 0|Schaffhausen|
|80:06____:17402:000| 17402|1505278458|Schaffhausen|
|80:06____:17403:000| 17403| 0|Schaffhausen|
|80:06____:17406:000| 17406|1505282110|Schaffhausen|
|80:06____:17409:000| 17409| 0|Schaffhausen|
|80:06____:17410:000| 17410|1505285757|Schaffhausen|
|80:06____:17411:000| 17411| 0|Schaffhausen|
|80:06____:17416:000| 17416|1505292890|Schaffhausen|
|80:06____:17417:000| 17417| 0|Schaffhausen|
|80:06____:17418:000| 17418|1505296501|Schaffhausen|
|80:06____:17419:000| 17419| 0|Schaffhausen|
|80:06____:17420:000| 17420|1505300253|Schaffhausen|
|80:06____:17421:000| 17421| 0|Schaffhausen|
|80:06____:17422:000| 17422|1505303814|Schaffhausen|
|80:06____:17423:000| 17423| 0|Schaffhausen|
|80:06____:17425:000| 17425| 0|Schaffhausen|
|80:06____:17426:000| 17426|1505307355|Schaffhausen|
|80:06____:17427:000| 17427| 0|Schaffhausen|
|80:06____:17428:000| 17428|1505310983|Schaffhausen|
|80:06____:17429:000| 17429| 0|Schaffhausen|
+-------------------+-------+----------+------------+
这是一个火车数据集,我要做的是:
Groupby 火车的 line_id,这样我就有了我所有的 station 和他们的线路;在每个组中按 (ef_ar_ts) 排序;然后按顺序获取 station 的 SET:每个 line_id 一个列表。这样,我将订购我的车站并重建整条线路。
到目前为止我尝试的是:
from pyspark.sql.functions import udf
@functions.udf
def keepline(df):
"""Keep lines splitted;"""
firstline = data1.first().trip_id
dftemp = df.where(data1.trip_id==firstline)
data1 = data1.fillna({'ef_ar_ts':0})
dftemp = dftemp.orderBy('ef_ar_ts')
return mylist
data2 = data1.select('*').groupby(data1.line_id).agg(udfmyfunc)
有什么帮助吗?比提前!
我们可以groupby line_id ,收集ef_ar_ts & station 列并使用 UDF.Hope 对两个集合进行排序,这有帮助。
由于你的dataframe在station中有相同的值,我添加了两行dummystation作为参考,
+-------------------+-------+----------+-------------+
| trip_id|line_id| ef_ar_ts| station|
+-------------------+-------+----------+-------------+
|80:06____:17401:000| 17401| 0| Schaffhausen|
|80:06____:17402:000| 17402|1505278458| Schaffhausen|
|80:06____:17403:000| 17403| 0| Schaffhausen|
......................................................
......................................................
|80:06____:17427:000| 17427| 0| Schaffhausen|
|80:06____:17428:000| 17428|1505310983| Schaffhausen|
|80:06____:17429:000| 17429| 0| Schaffhausen|
|80:06____:17429:000| 17401|1505278478|dummystation1|
|80:06____:17429:000| 17429|1505307355|dummystation2|
+-------------------+-------+----------+-------------+
## group and collect for each line id ##
df1 = df.groupby('line_id').agg(F.collect_list('ef_ar_ts').alias('ef_ar_ts'),F.collect_list('station').alias('station'))
+-------+---------------+-----------------------------+
|line_id|ef_ar_ts |station |
+-------+---------------+-----------------------------+
|17419 |[0] |[Schaffhausen] |
|17420 |[1505300253] |[Schaffhausen] |
|17403 |[0] |[Schaffhausen] |
|17406 |[1505282110] |[Schaffhausen] |
|17428 |[1505310983] |[Schaffhausen] |
|17421 |[0] |[Schaffhausen] |
|17427 |[0] |[Schaffhausen] |
|17411 |[0] |[Schaffhausen] |
|17416 |[1505292890] |[Schaffhausen] |
|17429 |[0, 1505307355]|[Schaffhausen, dummystation2]|
|17401 |[0, 1505278478]|[Schaffhausen, dummystation1]|
|17423 |[0] |[Schaffhausen] |
|17417 |[0] |[Schaffhausen] |
|17402 |[1505278458] |[Schaffhausen] |
|17418 |[1505296501] |[Schaffhausen] |
|17425 |[0] |[Schaffhausen] |
|17409 |[0] |[Schaffhausen] |
|17422 |[1505303814] |[Schaffhausen] |
|17426 |[1505307355] |[Schaffhausen] |
|17410 |[1505285757] |[Schaffhausen] |
+-------+---------------+-----------------------------+
## an UDF for merge both collections and sort them ##
from operator import itemgetter
udf1 = F.udf(lambda x,y : [st[1] for st in sorted(zip(x,y),key=itemgetter(0))])
df1.select('line_id',udf1('ef_ar_ts','station').alias('stations')).show(truncate=False)
+-------+-----------------------------+
|line_id|stations |
+-------+-----------------------------+
|17419 |[Schaffhausen] |
|17420 |[Schaffhausen] |
|17403 |[Schaffhausen] |
|17406 |[Schaffhausen] |
|17428 |[Schaffhausen] |
|17421 |[Schaffhausen] |
|17427 |[Schaffhausen] |
|17411 |[Schaffhausen] |
|17416 |[Schaffhausen] |
|17429 |[Schaffhausen, dummystation2]|
|17401 |[Schaffhausen, dummystation1]|
|17423 |[Schaffhausen] |
|17417 |[Schaffhausen] |
|17402 |[Schaffhausen] |
|17418 |[Schaffhausen] |
|17425 |[Schaffhausen] |
|17409 |[Schaffhausen] |
|17422 |[Schaffhausen] |
|17426 |[Schaffhausen] |
|17410 |[Schaffhausen] |
+-------+-----------------------------+