Pyspark 错误 ReduceByKey
Pyspark Error ReduceByKey
我的 reduceByKey()
有问题。我不显示结果...我有键、值...但无法使用 reduceByKey
...
data_test_bis = data_textfile.map(lambda x: (x.split(",")[8].encode("utf-8").replace('"','').replace("'",''), 1)).filter(lambda x: x[0].startswith('Ru'))#.reduceByKey(lambda x, y: x + y)
#data_test_filter = data_test_bis.filter(lambda x: x[0].startswith('"R'))
print("TEST FILTER !")
print(type(data_test_bis))
print(data_test_bis.take(5))
print(data_test_bis.keys().take(10))
print(data_test_bis.values().take(10))
结果:
TEST FILTER !
<class 'pyspark.rdd.PipelinedRDD'>
[('Rueil-Malmaison', 1), ('Ruse', 1), ('Rueil Malmaison', 1), ('Rueil-Malmaison', 1), ('Ruda Slaska', 1)]
['Rueil-Malmaison', 'Ruse', 'Rueil Malmaison', 'Rueil-Malmaison', 'Ruda Slaska', 'Ruisbroek (Belgique)', 'Ruda \xc3\x85\xc5\xa1l\xc3\x84\xe2\x80\xa6ska', 'Rueil malmaison', 'Rueil', 'Ruisbroek']
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
当我尝试这个时,出现错误:
print(data_test_bis.reduceByKey(add).take(10))
或
print(data_test_bis.reduceByKey(lambda x, y: x + y).take(10))
错误:
17/01/03 17:47:09 ERROR scheduler.TaskSetManager: Task 18 in stage 3.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/home/spark/julien/Test_.py", line 89, in <module>
test()
File "/home/spark/julien/Test_.py", line 33, in test
print(data_test_bis.reduceByKey(lambda x, y:x+y).take(10))
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1297, in take
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 939, in runJob
File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 3.0 failed 4 times, most recent failure: Lost task 18.3 in stage 3.0 (TID 67, 10.0.15.7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "/home/spark/julien/Test_.py", line 25, in <lambda>
IndexError: list index out of range
我不明白为什么我有一个 IndexError...
跟着我重复:我永远不会假设非结构化数据源的格式正确。
诸如此类的事情:
... .map(lambda x: (x.split(",")[8].encode("utf-8") ...)
非常适合快速教程,但在实践中毫无用处。一般来说,永远不要依赖于以下假设:
- 数据具有特定的形状(例如 9,逗号分隔的字段)。
- 编码/解码会成功(这里其实可以,但一般情况下不是)。
至少包括一个简单的异常处理:
def parse_to_pair(line):
try:
key = (line
.split(",")[8]
.encode("utf-8")
.replace('"', '')
.replace("'", ''))
return [(key, 1)]
except:
return []
并使用 flatMap
:
data_textfile.flatMap(parse_to_pair)
备注:
您可以通过调用 SparkContext.textFile
并将 use_unicode
设置为 False
来跳过 encode
。它将:
- 在Python中使用
str
代替unicode
2.
- 在Python中使用
bytes
3.
您不仅应确保该行包含至少 9 个字段,而且还应包含预期数量的字段。
- 如果您碰巧将
csv
作为输入,请使用 csv
reader.
我的 reduceByKey()
有问题。我不显示结果...我有键、值...但无法使用 reduceByKey
...
data_test_bis = data_textfile.map(lambda x: (x.split(",")[8].encode("utf-8").replace('"','').replace("'",''), 1)).filter(lambda x: x[0].startswith('Ru'))#.reduceByKey(lambda x, y: x + y)
#data_test_filter = data_test_bis.filter(lambda x: x[0].startswith('"R'))
print("TEST FILTER !")
print(type(data_test_bis))
print(data_test_bis.take(5))
print(data_test_bis.keys().take(10))
print(data_test_bis.values().take(10))
结果:
TEST FILTER !
<class 'pyspark.rdd.PipelinedRDD'>
[('Rueil-Malmaison', 1), ('Ruse', 1), ('Rueil Malmaison', 1), ('Rueil-Malmaison', 1), ('Ruda Slaska', 1)]
['Rueil-Malmaison', 'Ruse', 'Rueil Malmaison', 'Rueil-Malmaison', 'Ruda Slaska', 'Ruisbroek (Belgique)', 'Ruda \xc3\x85\xc5\xa1l\xc3\x84\xe2\x80\xa6ska', 'Rueil malmaison', 'Rueil', 'Ruisbroek']
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
当我尝试这个时,出现错误:
print(data_test_bis.reduceByKey(add).take(10))
或
print(data_test_bis.reduceByKey(lambda x, y: x + y).take(10))
错误:
17/01/03 17:47:09 ERROR scheduler.TaskSetManager: Task 18 in stage 3.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/home/spark/julien/Test_.py", line 89, in <module>
test()
File "/home/spark/julien/Test_.py", line 33, in test
print(data_test_bis.reduceByKey(lambda x, y:x+y).take(10))
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1297, in take
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 939, in runJob
File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 3.0 failed 4 times, most recent failure: Lost task 18.3 in stage 3.0 (TID 67, 10.0.15.7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "/home/spark/julien/Test_.py", line 25, in <lambda>
IndexError: list index out of range
我不明白为什么我有一个 IndexError...
跟着我重复:我永远不会假设非结构化数据源的格式正确。
诸如此类的事情:
... .map(lambda x: (x.split(",")[8].encode("utf-8") ...)
非常适合快速教程,但在实践中毫无用处。一般来说,永远不要依赖于以下假设:
- 数据具有特定的形状(例如 9,逗号分隔的字段)。
- 编码/解码会成功(这里其实可以,但一般情况下不是)。
至少包括一个简单的异常处理:
def parse_to_pair(line):
try:
key = (line
.split(",")[8]
.encode("utf-8")
.replace('"', '')
.replace("'", ''))
return [(key, 1)]
except:
return []
并使用 flatMap
:
data_textfile.flatMap(parse_to_pair)
备注:
您可以通过调用
SparkContext.textFile
并将use_unicode
设置为False
来跳过encode
。它将:- 在Python中使用
str
代替unicode
2. - 在Python中使用
bytes
3.
- 在Python中使用
您不仅应确保该行包含至少 9 个字段,而且还应包含预期数量的字段。
- 如果您碰巧将
csv
作为输入,请使用csv
reader.