PySpark - 加入两个 RDD - 无法加入 - 解压的值太多
PySpark - Join two RDDs - Cannot join - Too many values to unpack
我在 HDFS 中有两个文件(非常简单):
测试:
1,Team1
2,Team2
3,Team3
测试 2:
11,Player1,Team1
22,Player1,Team2
32,Player1,Team3
我想加入他们(按 Team* 列)以获得以下输出:
Team1,1,11,Player1
Team3,3,32,Player1
为此,我使用以下代码:
test = sc.textFile("/user/cloudera/Tests/test")
test_filter = test.filter(lambda a: a.split(",")[1].upper() == "TEAM1" or a.split(",")[1].upper() == "TEAM2")
test_map = test_filter.map(lambda a: a.upper())
test_map = test_map.map(lambda a: (a.split(",")[1]))
for i in test_map.collect(): print(i)
test2=sc.textFile("/user/cloudera/Tests/test2")
test2_map = test2.map(lambda a: a.upper())
test2_map = test2_map.map(lambda a: (a.split(",")[2], a.split(",")[1]))
for i in test2_map.collect(): print(i)
test_join = test_map.join(test2_map)
for i in test_join.collect(): print(i)
但是当我尝试查看连接 RDD 时出现以下错误:
File "/usr/lib/spark/python/pyspark/rdd.py", line 1807, in <lambda>
map_values_fn = lambda (k, v): (k, f(v))
ValueError: too many values to unpack
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:135)
at org.apache.spark.api.python.PythonRDD$$anon.<init>(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
我做错了什么?
谢谢!
能否显示这两条语句的结果集:
对于 test_map.collect() 中的 i: print(i)
&
对于 test2_map.collect() 中的 i: print(i)
你也可以试试下面的:
test = sc.textFile("/user/cloudera/Tests/test")
test_map = test.map(lambda a:a.upper())
test_map = test_map.map(lambda a: (a.split(",")[1],a.split(",")[0]))
for i in test_map.collect(): print(i)
test2=sc.textFile("/user/cloudera/Tests/test2")
test2_map = test2.map(lambda a: a.upper())
test2_map = test2_map.map(lambda a: (a.split(",")[2], a.split(",")[1]))
for i in test2_map.collect(): print(i)
test_join = test_map.join(test2_map)
for i in test_join.collect(): print(i)
我在 HDFS 中有两个文件(非常简单):
测试:
1,Team1
2,Team2
3,Team3
测试 2:
11,Player1,Team1
22,Player1,Team2
32,Player1,Team3
我想加入他们(按 Team* 列)以获得以下输出:
Team1,1,11,Player1
Team3,3,32,Player1
为此,我使用以下代码:
test = sc.textFile("/user/cloudera/Tests/test")
test_filter = test.filter(lambda a: a.split(",")[1].upper() == "TEAM1" or a.split(",")[1].upper() == "TEAM2")
test_map = test_filter.map(lambda a: a.upper())
test_map = test_map.map(lambda a: (a.split(",")[1]))
for i in test_map.collect(): print(i)
test2=sc.textFile("/user/cloudera/Tests/test2")
test2_map = test2.map(lambda a: a.upper())
test2_map = test2_map.map(lambda a: (a.split(",")[2], a.split(",")[1]))
for i in test2_map.collect(): print(i)
test_join = test_map.join(test2_map)
for i in test_join.collect(): print(i)
但是当我尝试查看连接 RDD 时出现以下错误:
File "/usr/lib/spark/python/pyspark/rdd.py", line 1807, in <lambda>
map_values_fn = lambda (k, v): (k, f(v))
ValueError: too many values to unpack
at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:135)
at org.apache.spark.api.python.PythonRDD$$anon.<init>(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
我做错了什么?
谢谢!
能否显示这两条语句的结果集: 对于 test_map.collect() 中的 i: print(i) & 对于 test2_map.collect() 中的 i: print(i)
你也可以试试下面的:
test = sc.textFile("/user/cloudera/Tests/test")
test_map = test.map(lambda a:a.upper())
test_map = test_map.map(lambda a: (a.split(",")[1],a.split(",")[0]))
for i in test_map.collect(): print(i)
test2=sc.textFile("/user/cloudera/Tests/test2")
test2_map = test2.map(lambda a: a.upper())
test2_map = test2_map.map(lambda a: (a.split(",")[2], a.split(",")[1]))
for i in test2_map.collect(): print(i)
test_join = test_map.join(test2_map)
for i in test_join.collect(): print(i)