Spark SQL - 加入后保留一个结果

Spark SQL - Keep one result after join

我有两个数据框,我试图将它们连接在一起,但我意识到在我最初的实现中,我得到了不希望的结果:

// plain_txns_df.show(false)
+------------+---------------+---------------+-----------+
|txn_date_at |account_number |merchant_name  |txn_amount |
+------------+---------------+---------------+-----------+
|2020-04-08  |1234567        |Starbucks      |2.02       |
|2020-04-14  |1234567        |Starbucks      |2.86       |
|2020-04-14  |1234567        |Subway         |12.02      |
|2020-04-14  |1234567        |Amazon         |3.21       |
+------------+---------------+---------------+-----------+
// richer_txns_df.show(false)
+----------+-------+----------------------+-------------+
|TXN_DT    |ACCT_NO|merch_name            |merchant_city|
+----------+-------+----------------------+-------------+
|2020-04-08|1234567|Subway                |Toronto      |
|2020-04-14|1234567|Subway                |Toronto      |
+----------+-------+----------------------+-------------+

根据以上两个数据框,我的目标是丰富与商家城市的普通交易,对于 7 天内的交易 window(即来自更丰富的交易数据框的交易日期应该介于普通日期和普通日期 - 7 天。

最初我认为这很简单,所以加入了数据(我知道范围加入):

spark.sql(
    """
      | SELECT
      | plain.txn_date_at,
      | plain.account_number,
      | plain.merchant_name,
      | plain.txn_amount,
      | richer.merchant_city
      | FROM plain_txns_df plain
      | LEFT JOIN richer_txns_df richer
      | ON plain.account_number = richer.ACCT_NO
      | AND plain.merchant_name = richer.merch_name
      | AND richer.txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
    """.stripMargin)

但是,当使用上面的方法时,我得到了 4 月 14 日交易的重复结果,因为商户详细信息和帐户详细信息与 8 日的更丰富的记录匹配并且符合日期范围:

+------------+---------------+---------------+-----------+-------------+
|txn_date_at |account_number |merchant_name  |txn_amount |merchant_city|
+------------+---------------+---------------+-----------+-------------+
|2020-04-08  |1234567        |Starbucks      |2.02       |Toronto      |
|2020-04-14  |1234567        |Starbucks      |2.86       |Toronto      | // Apr-08 Richer record
|2020-04-14  |1234567        |Starbucks      |2.86       |Toronto      |
+------------+---------------+---------------+-----------+-------------+

有没有一种方法可以让我的普通 DataFrame 中的每个值只获得 一个 记录(即在上述结果集中获得第 14 个记录)? 我尝试 运行 在 连接后进行不同的 ,这解决了这个问题,但我意识到如果同一商家在同一天有两笔交易,我会丢失这些交易。

我正在考虑将更丰富的 table 移动到子查询,然后在其中应用日期过滤器,但我不知道如何将交易日期过滤器值传递到此查询:(。有些东西如下所示,但它不识别普通交易日期:

spark.sql(
    """
      | SELECT
      | plain.txn_date_at,
      | plain.account_number,
      | plain.merchant_name,
      | plain.txn_amount,
      | richer2.merchant_city
      | FROM plain_txns_df plain
      | LEFT JOIN ( 
      |    SELECT ACCT_NO, merch_name from richer_txns_df
      |    WHERE txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
      | ) richer2
      | ON plain.account_number = richer2.ACCT_NO
      | AND plain.merchant_name = richer2.merch_name
    """.stripMargin)

我认为需要做的第一件事是在 plain_txns_df 上创建一个唯一键,这样可以在尝试 aggregate/compare 时区分行。

import org.apache.spark.sql.functions._
plainDf.withColumn("id", monotonically_increasing_id())

这样您就可以继续执行您发布的第一个查询(加上 id 列),returns 重复:

spark.sql("""
    SELECT
    plain.id,
    plain.txn_date_at,
    plain.account_number,
    plain.merchant_name,
    plain.txn_amount,
    richer.merchant_city,
    richer.txn_dt
    FROM plain_txns_df plain
    INNER JOIN richer_txns_df richer
    ON plain.account_number = richer.acc_no
    AND plain.merchant_name = richer.merch_name
    AND richer.txn_dt BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
  """.stripMargin).createOrReplaceTempView("foo")

接下来通过获取给定 id.

的最新 richer_txns_df.txn_dt 日期的记录来删除上述数据帧的重复数据
spark.sql("""
    SELECT
    f1.txn_date_at,
    f1.account_number,
    f1.merchant_name,
    f1.txn_amount,
    f1.merchant_city
    FROM foo f1
    LEFT JOIN foo f2
    ON f2.id = f1.id
    AND f2.txn_dt > f1.txn_dt
    WHERE f2.id IS NULL
  """.stripMargin).show