如何使用 Python Core API (Apache Spark) 连接三个 RDD?

How to join three RDDs using the Python Core API (Apache Spark)?

我正在尝试使用 Python 核心 API 将 RDD 连接在一起,这是通过 Apache Spark 实现的;然而,我没有运气来完成这个。

目前,我有这三个具有共同属性的RDD:

现在,当将两个 RDD 连接在一起时,它工作得很好,没有任何问题:

user_rev_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
)

虽然,为了将所有三个连接在一起,我已经尝试过这个,但由于某种原因它对我根本不起作用:

user_rev_com_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
 .join(
      companies_rdd.keyBy(lambda com: com['company_id'])
  )
)

任何关于如何将我的所有三个 RDD 连接在一起的帮助都会非常有帮助,因为我不确定如何正确地做这样的事情。谢谢

第一次加入后,密钥为user_id,但您加入companies_rdd的密钥为company_id,因此加入密钥不正确。您需要将密钥更改为 company_id,例如

user_rev_com_rdd = (users_rdd
    .keyBy(lambda user: user['user_id'])
    .join(
        reviews_rdd.keyBy(lambda rev: rev['user_id'])
    )
    .map(lambda r: (r[1][1]['company_id'], r[1]))
    .join(
        companies_rdd.keyBy(lambda com: com['company_id'])
    )
)

将三个RDD中的元素组合起来,加入后去掉joining key,可以在末尾加一个map

user_rev_com_rdd = (users_rdd
    .keyBy(lambda user: user['user_id'])
    .join(
        reviews_rdd.keyBy(lambda rev: rev['user_id'])
    )
    .map(lambda r: (r[1][1]['company_id'], r[1]))
    .join(
        companies_rdd.keyBy(lambda com: com['company_id'])
    )
    .map(lambda r: (*r[1][0], r[1][1]))
)