根据具有多个交货日期的客户 ID 计算总订单

Suming total orders based on customer ids with multiple deliveries date

我有送货数据。我想根据 cust_id sum 总订购 net_wt。在大多数情况下,这很容易,因为我可以通过分组 cust_id 来总结所有内容。但是,我也需要保留交货日期。但我遇到了麻烦,因为有些订单有多个交货日期,然后我不确定如何至少保留最新订单日期以及所有订单的总和,而不管交货日期如何。

cust_id     date_delivery   log_type    net_wt
4776210      3/4/2021         Bulk      3880
4776210      3/4/2021         Bulk      6160
4787563      3/20/2021        Bulk      10360
4787563      3/20/2021        Bulk      3800
4787563      3/20/2021        Bulk      5020
4787563      3/20/2021        Bulk      2120
4787563      3/25/2021        Bulk      2100
4787563      3/25/2021        Bulk      2140
4792002      3/27/2021        Bulk      9042
4790494      3/25/2021        Bulk      3718
4790494      3/25/2021        Bulk      8102

需要输出

    cust_id     date_delivery   log_type    total_order
   4776210       3/4/2021         Bulk      10040
   4787563       ????????         Bulk      25540
   4790494       3/25/2021        Bulk      11820

我试过了

df.createOrReplaceTempView('df')
df_test = spark.sql("""
SELECT cust_id, date_delivery,
       SUM(net_wt) AS `total_order`
FROM df
GROUP BY 1
""")
display(df_test)

但它不起作用。我想要一个 cust_id 至少有 1 个相应的日期(可以是最近的 ndelivery_date)。

如有任何帮助,我们将不胜感激

提前致谢

如果日期列被指定为实际日期而不是字符串,您也可以将聚合函数(如本例中的“max”)应用于日期列。为此,我们将其转换为日期类型 to_date.

import pyspark.sql.functions as F

# run the following command only if you have Spark>=3.0, see:
# 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# convert string type to actual date
df = df.withColumn('date_delivery', F.to_date('date_delivery', 'MM/dd/yyyy'))

# get aggregations
df_new = df\
  .groupby('cust_id')\
  .agg({
    'date_delivery': 'max',
    'log_type': 'max',
    'net_wt': 'sum'
  })

# if you need to rename the columns
df_new = df_new.toDF('cust_id', 'log_type', 'net_wf', 'date_delivery')

df_new.show()

+-------+--------+------+-------------+
|cust_id|log_type|net_wf|date_delivery|
+-------+--------+------+-------------+
|4776210|    Bulk| 10040|   2021-03-04|
|4787563|    Bulk| 25540|   2021-03-25|
|4790494|    Bulk| 11820|   2021-03-25|
|4792002|    Bulk|  9042|   2021-03-27|
+-------+--------+------+-------------+

只需在日期

添加max
>>> df.show()
+-------+---------+----+----+
|     id|       dt| log|  wt|
+-------+---------+----+----+
|4776210| 3/4/2021|Bulk| 400|
|4776210|3/14/2021|Bulk|1400|
|4787563|3/24/2021|Bulk| 200|
|4787563|3/14/2021|Bulk|4400|
|4787563| 3/4/2021|Bulk| 500|
+-------+---------+----+----+

>>> df.createOrReplaceTempView('df')
>>> spark.sql('''select id, max(to_date(dt,'M/d/yyyy')) as dt,log,sum(wt) as wt from df group by id,log''').show()
+-------+----------+----+------+
|     id|        dt| log|    wt|
+-------+----------+----+------+
|4776210|2021-03-14|Bulk|1800.0|
|4787563|2021-03-24|Bulk|5100.0|
+-------+----------+----+------+