如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN
How to implement NOT IN for two DataFrames with different structure in Apache Spark
我在 Java 应用程序中使用 Apache Spark。
我有两个 DataFrame
:df1
和 df2
。 df1
包含 Row
以及 email
、firstName
和 lastName
。 df2
包含 Row
和 email
。
我想创建一个 DataFrame
: df3
,其中包含 df1
中的所有行,df2
中不存在该电子邮件。
有没有办法用 Apache Spark 做到这一点?我尝试从 df1
和 df2
创建 JavaRDD<String>
,方法是将它们转换为 toJavaRDD()
并过滤 df1
以包含所有电子邮件,然后使用 subtract
,但我不知道如何将新的 JavaRDD
映射到 ds1
并获得 DataFrame
.
基本上我需要 df1
中的所有行,其电子邮件不在 df2
.
中
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");
JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));
List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();
Spark 2.0.0+
可以直接使用NOT IN
。
Spark < 2.0.0
可以用outer join和filter来表达
val customers = sc.parallelize(Seq(
("john@example.com", "John", "Doe"),
("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")
val customersWhoOrderedTheProduct = sc.parallelize(Seq(
Tuple1("jane@example.com")
)).toDF("email")
val customersWhoHaventOrderedTheProduct = customers.join(
customersWhoOrderedTheProduct.select($"email".alias("email_")),
$"email" === $"email_", "leftouter")
.where($"email_".isNull).drop("email_")
customersWhoHaventOrderedTheProduct.show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
原始 SQL 等效:
customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
"customersWhoOrderedTheProduct")
val query = """SELECT c.* FROM customers c LEFT OUTER JOIN
customersWhoOrderedTheProduct o
ON c.email = o.email
WHERE o.email IS NULL"""
sqlContext.sql(query).show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
我在 python
中做到了,此外我建议您使用整数作为键而不是字符串。
from pyspark.sql.types import *
samples = sc.parallelize([
("abonsanto@fakemail.com", "Alberto", "Bonsanto"), ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
("stranger@fakemail.com", "Stranger", "Weirdo"), ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])
keys = sc.parallelize(
[("abonsanto@fakemail.com",), ("mbonsanto@fakemail.com",), ("dbonsanto@fakemail.com",)]
)
complex_schema = StructType([
StructField("email", StringType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True)
])
simple_schema = StructType([
StructField("email", StringType(), True)
])
df1 = sqlContext.createDataFrame(samples, complex_schema)
df2 = sqlContext.createDataFrame(keys, simple_schema)
df1.show()
df2.show()
df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show()
我在 Java 应用程序中使用 Apache Spark。
我有两个 DataFrame
:df1
和 df2
。 df1
包含 Row
以及 email
、firstName
和 lastName
。 df2
包含 Row
和 email
。
我想创建一个 DataFrame
: df3
,其中包含 df1
中的所有行,df2
中不存在该电子邮件。
有没有办法用 Apache Spark 做到这一点?我尝试从 df1
和 df2
创建 JavaRDD<String>
,方法是将它们转换为 toJavaRDD()
并过滤 df1
以包含所有电子邮件,然后使用 subtract
,但我不知道如何将新的 JavaRDD
映射到 ds1
并获得 DataFrame
.
基本上我需要 df1
中的所有行,其电子邮件不在 df2
.
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");
JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));
List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();
Spark 2.0.0+
可以直接使用NOT IN
。
Spark < 2.0.0
可以用outer join和filter来表达
val customers = sc.parallelize(Seq(
("john@example.com", "John", "Doe"),
("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")
val customersWhoOrderedTheProduct = sc.parallelize(Seq(
Tuple1("jane@example.com")
)).toDF("email")
val customersWhoHaventOrderedTheProduct = customers.join(
customersWhoOrderedTheProduct.select($"email".alias("email_")),
$"email" === $"email_", "leftouter")
.where($"email_".isNull).drop("email_")
customersWhoHaventOrderedTheProduct.show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
原始 SQL 等效:
customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
"customersWhoOrderedTheProduct")
val query = """SELECT c.* FROM customers c LEFT OUTER JOIN
customersWhoOrderedTheProduct o
ON c.email = o.email
WHERE o.email IS NULL"""
sqlContext.sql(query).show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
我在 python
中做到了,此外我建议您使用整数作为键而不是字符串。
from pyspark.sql.types import *
samples = sc.parallelize([
("abonsanto@fakemail.com", "Alberto", "Bonsanto"), ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
("stranger@fakemail.com", "Stranger", "Weirdo"), ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])
keys = sc.parallelize(
[("abonsanto@fakemail.com",), ("mbonsanto@fakemail.com",), ("dbonsanto@fakemail.com",)]
)
complex_schema = StructType([
StructField("email", StringType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True)
])
simple_schema = StructType([
StructField("email", StringType(), True)
])
df1 = sqlContext.createDataFrame(samples, complex_schema)
df2 = sqlContext.createDataFrame(keys, simple_schema)
df1.show()
df2.show()
df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show()