Fitter Spark RDD基于过滤不同RDD的结果
Fitter Spark RDD based on result from filtering of different RDD
conf = SparkConf().setAppName("my_app")
with SparkContext(conf=conf) as sc:
sqlContext = SQLContext(sc)
df = sqlContext.read.parquet(*s3keys)
# this gives me distinct values as list
rdd = df.filter(
(1442170800000 <= df.timestamp) & (
df.timestamp <= 1442185200000) & (
df.lat > 40.7480) & (df.lat < 40.7513) & (
df.lon > -73.8492) & (
df.lon < -73.8438)).map(lambda p: p.userid).distinct()
# how do I apply the above list to filter another rdd?
df2 = sqlContext.read.parquet(*s3keys_part2)
# example:
rdd = df2.filter(df2.col1 in (rdd values from above))
如 Matthew Graves 所述,您需要的是连接。大概意思是这样的:
pred = ((1442170800000 <= df.timestamp) &
(df.timestamp <= 1442185200000) &
(df.lat > 40.7480) &
(df.lat < 40.7513) &
(df.lon > -73.8492) &
(df.lon < -73.8438))
users = df.filter(pred).select("userid").distinct()
users.join(df2, users.userid == df2.col1)
这是 Scala 代码,而不是 Python,但希望它仍然可以作为示例。
val x = 1 to 9
val df2 = sc.parallelize(x.map(a => (a,a*a))).toDF()
val df3 = sc.parallelize(x.map(a => (a,a*a*a))).toDF()
这给了我们两个数据框,每个数据框都有名为_1和_2的列,它们是前九个自然数及其squares/cubes。
val fil = df2.filter("_1 < 5") // Nine is too many, let's go to four.
val filJoin = fil.join(df3,fil("_1") === df3("_1")
filJoin.collect
这让我们:
Array[org.apache.spark.sql.Row] = Array([1,1,1,1], [2,4,2,8], [3,9,3,27], [4,16,4,64])
要将此应用于您的问题,我将从以下内容开始:
rdd2 = rdd.join(df2, rdd.userid == df2.userid, 'inner')
但是请注意,我们需要告诉它要加入哪些列,这可能不是 userid
for df2
。我还建议您使用 .select('userid').distinct()
而不是 map(lambda p: p.userid)
,这样它仍然是一个数据框。
您可以了解有关加入 here 的更多信息。
conf = SparkConf().setAppName("my_app")
with SparkContext(conf=conf) as sc:
sqlContext = SQLContext(sc)
df = sqlContext.read.parquet(*s3keys)
# this gives me distinct values as list
rdd = df.filter(
(1442170800000 <= df.timestamp) & (
df.timestamp <= 1442185200000) & (
df.lat > 40.7480) & (df.lat < 40.7513) & (
df.lon > -73.8492) & (
df.lon < -73.8438)).map(lambda p: p.userid).distinct()
# how do I apply the above list to filter another rdd?
df2 = sqlContext.read.parquet(*s3keys_part2)
# example:
rdd = df2.filter(df2.col1 in (rdd values from above))
如 Matthew Graves 所述,您需要的是连接。大概意思是这样的:
pred = ((1442170800000 <= df.timestamp) &
(df.timestamp <= 1442185200000) &
(df.lat > 40.7480) &
(df.lat < 40.7513) &
(df.lon > -73.8492) &
(df.lon < -73.8438))
users = df.filter(pred).select("userid").distinct()
users.join(df2, users.userid == df2.col1)
这是 Scala 代码,而不是 Python,但希望它仍然可以作为示例。
val x = 1 to 9
val df2 = sc.parallelize(x.map(a => (a,a*a))).toDF()
val df3 = sc.parallelize(x.map(a => (a,a*a*a))).toDF()
这给了我们两个数据框,每个数据框都有名为_1和_2的列,它们是前九个自然数及其squares/cubes。
val fil = df2.filter("_1 < 5") // Nine is too many, let's go to four.
val filJoin = fil.join(df3,fil("_1") === df3("_1")
filJoin.collect
这让我们:
Array[org.apache.spark.sql.Row] = Array([1,1,1,1], [2,4,2,8], [3,9,3,27], [4,16,4,64])
要将此应用于您的问题,我将从以下内容开始:
rdd2 = rdd.join(df2, rdd.userid == df2.userid, 'inner')
但是请注意,我们需要告诉它要加入哪些列,这可能不是 userid
for df2
。我还建议您使用 .select('userid').distinct()
而不是 map(lambda p: p.userid)
,这样它仍然是一个数据框。
您可以了解有关加入 here 的更多信息。