如何使用 Python Core API (Apache Spark) 连接三个 RDD?
How to join three RDDs using the Python Core API (Apache Spark)?
我正在尝试使用 Python 核心 API 将 RDD 连接在一起,这是通过 Apache Spark 实现的;然而,我没有运气来完成这个。
目前,我有这三个具有共同属性的RDD:
- users_rdd: user_id
- reviews_rdd:review_id、company_id 和 user_id
- companies_rdd: company_id
现在,当将两个 RDD 连接在一起时,它工作得很好,没有任何问题:
user_rev_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
)
虽然,为了将所有三个连接在一起,我已经尝试过这个,但由于某种原因它对我根本不起作用:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
任何关于如何将我的所有三个 RDD 连接在一起的帮助都会非常有帮助,因为我不确定如何正确地做这样的事情。谢谢
第一次加入后,密钥为user_id
,但您加入companies_rdd
的密钥为company_id
,因此加入密钥不正确。您需要将密钥更改为 company_id
,例如
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
将三个RDD中的元素组合起来,加入后去掉joining key,可以在末尾加一个map
:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
.map(lambda r: (*r[1][0], r[1][1]))
)
我正在尝试使用 Python 核心 API 将 RDD 连接在一起,这是通过 Apache Spark 实现的;然而,我没有运气来完成这个。
目前,我有这三个具有共同属性的RDD:
- users_rdd: user_id
- reviews_rdd:review_id、company_id 和 user_id
- companies_rdd: company_id
现在,当将两个 RDD 连接在一起时,它工作得很好,没有任何问题:
user_rev_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
)
虽然,为了将所有三个连接在一起,我已经尝试过这个,但由于某种原因它对我根本不起作用:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
任何关于如何将我的所有三个 RDD 连接在一起的帮助都会非常有帮助,因为我不确定如何正确地做这样的事情。谢谢
第一次加入后,密钥为user_id
,但您加入companies_rdd
的密钥为company_id
,因此加入密钥不正确。您需要将密钥更改为 company_id
,例如
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
)
将三个RDD中的元素组合起来,加入后去掉joining key,可以在末尾加一个map
:
user_rev_com_rdd = (users_rdd
.keyBy(lambda user: user['user_id'])
.join(
reviews_rdd.keyBy(lambda rev: rev['user_id'])
)
.map(lambda r: (r[1][1]['company_id'], r[1]))
.join(
companies_rdd.keyBy(lambda com: com['company_id'])
)
.map(lambda r: (*r[1][0], r[1][1]))
)