如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN

How to implement NOT IN for two DataFrames with different structure in Apache Spark

我在 Java 应用程序中使用 Apache Spark。 我有两个 DataFramedf1df2df1 包含 Row 以及 emailfirstNamelastNamedf2 包含 Rowemail

我想创建一个 DataFrame: df3,其中包含 df1 中的所有行,df2 中不存在该电子邮件。

有没有办法用 Apache Spark 做到这一点?我尝试从 df1df2 创建 JavaRDD<String>,方法是将它们转换为 toJavaRDD() 并过滤 df1 以包含所有电子邮件,然后使用 subtract,但我不知道如何将新的 JavaRDD 映射到 ds1 并获得 DataFrame.

基本上我需要 df1 中的所有行,其电子邮件不在 df2.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
                            "WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
                        .map(row -> row.getString(0))
                        .subtract(customersBoughtEmail).collect();

Spark 2.0.0+

可以直接使用NOT IN

Spark < 2.0.0

可以用outer join和filter来表达

val customers = sc.parallelize(Seq(
  ("john@example.com", "John", "Doe"),
  ("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("jane@example.com")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

原始 SQL 等效:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

我在 python 中做到了,此外我建议您使用整数作为键而不是字符串。

from pyspark.sql.types import *

samples = sc.parallelize([
    ("abonsanto@fakemail.com", "Alberto", "Bonsanto"), ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
    ("stranger@fakemail.com", "Stranger", "Weirdo"), ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])

keys = sc.parallelize(
    [("abonsanto@fakemail.com",), ("mbonsanto@fakemail.com",), ("dbonsanto@fakemail.com",)]
)

complex_schema = StructType([
    StructField("email", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

simple_schema = StructType([
    StructField("email", StringType(), True)
])

df1 = sqlContext.createDataFrame(samples, complex_schema)
df2 = sqlContext.createDataFrame(keys, simple_schema)

df1.show()
df2.show()

df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show()