有没有办法在另一个地图函数中使用地图函数?
Is there a way to use a map function inside another map function?
我需要使用另一个 RDD 中的值来“转换”一个 RDD 中的值
像这样:
rdd1=sc.parallelize(['aa,bb','cc,dd','ee,aa'])
rdd2=sc.parallelize(['aa,1' , 'bb,2' , 'cc,3' , 'dd,4' , 'ee,5'])
result: ['1,2', '3,4' , '5,1']
我试过使用以下地图函数
def mymap (c):
src=c[0]
dst=c[2]
srcnew=rdd2.lookup(src)[0]
dstnew=rdd2.lookup(dst)[0]
return (srcnew,dstnew)
rdd3=rdd1.map(mymap)
但我收到以下错误:
It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
我想到的唯一解决方案是使用 collect() 命令,但我的数据集很大。有没有办法在另一个RDD中调用一个RDD中的值?
如果您的数据集很大,您可以使用 Pair RDD Join operation 并利用 Spark RDD
的分布式计算
在您的情况下,您可以按如下方式使用两个联接查找值:
val rdd1 = sc.parallelize(Seq("aa,bb", "cc,dd", "ee,ff"))
val rdd2 = sc.parallelize(Seq("aa,1", "bb,2", "cc,3", "dd,4", "ee,5", "ff,6"))
// Transform rdd to Key Value (KV) RDD (a.k.a Pair RDD)
val rdd1KV: RDD[(String, String)] = rdd1.map(x => {
val elements = x.split(",")
(elements.head, elements.last)
})
val rdd2KV: RDD[(String, String)] = rdd2.map({ x =>
val elements = x.split(",")
(elements.head, elements.last)
})
/*
Join and format the RDD as KV
(ee,(ff,5)) => (ff,5)
(aa,(bb,1)) => (bb,1)
(cc,(dd,3)) => (dd,3)
*/
val rddTmp = rdd1KV.join(rdd2KV).map(x => x._2)
/* Second join and format the output
(dd,(3,4)) => 3,4
(ff,(5,6)) => 5,6
(bb,(1,2)) => 1,2
*/
rddTmp.join(rdd2KV).map(x => s"${x._2._1},${x._2._2}").foreach(println)
您应该能够通过一些简单的连接和映射来完成此操作。请注意下面的 rdd1
和 rdd2
已使用映射函数进行了调整,因此条目可作为数组使用。
rdd1=sc.parallelize(['aa,bb','cc,dd','ee,ff']) \
.map(lambda x: x.split(','))
rdd2=sc.parallelize(['aa,1','bb,2','cc,3','dd,4','ee,5','ff,6']) \
.map(lambda x: x.split(','))
rdd3 = rdd1 \
.join(rdd2) \
.map(lambda x: x[1]) \
.join(rdd2) \
.map(lambda x: x[1]) \
.map(lambda x: ','.join(x))
rdd3.foreach(print)
# outputs
# 1,2
# 3,4
# 5,6
以上会
- 加入 rdds 以解析 rdd1 中的第一个条目给
[('aa', ('bb', '1')), ('cc', ('dd', '3')), ('ee', ('ff', '5'))]
- 将上面的 rdd 映射为来自连接 giving
的值
[('bb', '1'), ('dd', '3'), ('ff', '5')]
- 再次加入 rdd2 以解析 ordinally 来自 rdd1 的第二个条目
[('bb', ('1', '2')), ('dd', ('3', '4')), ('ff', ('5', '6'))]
- 再次,将上面的 rdd 映射为来自连接的值 giving
[('1', '2'), ('3', '4'), ('5', '6')]
- 将元组映射到字符串,加入
,
给出
['1,2', '3,4', '5,6']
第 4 步和第 5 步显然可以合并为 lambda x : ','.join(x[1])
- 为了清楚起见,我在上面将它们分开了
有关需要保留初始密钥的更复杂的示例,请参阅此
我需要使用另一个 RDD 中的值来“转换”一个 RDD 中的值 像这样:
rdd1=sc.parallelize(['aa,bb','cc,dd','ee,aa'])
rdd2=sc.parallelize(['aa,1' , 'bb,2' , 'cc,3' , 'dd,4' , 'ee,5'])
result: ['1,2', '3,4' , '5,1']
我试过使用以下地图函数
def mymap (c):
src=c[0]
dst=c[2]
srcnew=rdd2.lookup(src)[0]
dstnew=rdd2.lookup(dst)[0]
return (srcnew,dstnew)
rdd3=rdd1.map(mymap)
但我收到以下错误:
It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
我想到的唯一解决方案是使用 collect() 命令,但我的数据集很大。有没有办法在另一个RDD中调用一个RDD中的值?
如果您的数据集很大,您可以使用 Pair RDD Join operation 并利用 Spark RDD
的分布式计算在您的情况下,您可以按如下方式使用两个联接查找值:
val rdd1 = sc.parallelize(Seq("aa,bb", "cc,dd", "ee,ff"))
val rdd2 = sc.parallelize(Seq("aa,1", "bb,2", "cc,3", "dd,4", "ee,5", "ff,6"))
// Transform rdd to Key Value (KV) RDD (a.k.a Pair RDD)
val rdd1KV: RDD[(String, String)] = rdd1.map(x => {
val elements = x.split(",")
(elements.head, elements.last)
})
val rdd2KV: RDD[(String, String)] = rdd2.map({ x =>
val elements = x.split(",")
(elements.head, elements.last)
})
/*
Join and format the RDD as KV
(ee,(ff,5)) => (ff,5)
(aa,(bb,1)) => (bb,1)
(cc,(dd,3)) => (dd,3)
*/
val rddTmp = rdd1KV.join(rdd2KV).map(x => x._2)
/* Second join and format the output
(dd,(3,4)) => 3,4
(ff,(5,6)) => 5,6
(bb,(1,2)) => 1,2
*/
rddTmp.join(rdd2KV).map(x => s"${x._2._1},${x._2._2}").foreach(println)
您应该能够通过一些简单的连接和映射来完成此操作。请注意下面的 rdd1
和 rdd2
已使用映射函数进行了调整,因此条目可作为数组使用。
rdd1=sc.parallelize(['aa,bb','cc,dd','ee,ff']) \
.map(lambda x: x.split(','))
rdd2=sc.parallelize(['aa,1','bb,2','cc,3','dd,4','ee,5','ff,6']) \
.map(lambda x: x.split(','))
rdd3 = rdd1 \
.join(rdd2) \
.map(lambda x: x[1]) \
.join(rdd2) \
.map(lambda x: x[1]) \
.map(lambda x: ','.join(x))
rdd3.foreach(print)
# outputs
# 1,2
# 3,4
# 5,6
以上会
- 加入 rdds 以解析 rdd1 中的第一个条目给
[('aa', ('bb', '1')), ('cc', ('dd', '3')), ('ee', ('ff', '5'))]
- 将上面的 rdd 映射为来自连接 giving
的值[('bb', '1'), ('dd', '3'), ('ff', '5')]
- 再次加入 rdd2 以解析 ordinally 来自 rdd1 的第二个条目
[('bb', ('1', '2')), ('dd', ('3', '4')), ('ff', ('5', '6'))]
- 再次,将上面的 rdd 映射为来自连接的值 giving
[('1', '2'), ('3', '4'), ('5', '6')]
- 将元组映射到字符串,加入
,
给出
['1,2', '3,4', '5,6']
第 4 步和第 5 步显然可以合并为 lambda x : ','.join(x[1])
- 为了清楚起见,我在上面将它们分开了
有关需要保留初始密钥的更复杂的示例,请参阅此