Pyspark 错误 ReduceByKey

Pyspark Error ReduceByKey

我的 reduceByKey() 有问题。我不显示结果...我有键、值...但无法使用 reduceByKey...

data_test_bis = data_textfile.map(lambda x: (x.split(",")[8].encode("utf-8").replace('"','').replace("'",''), 1)).filter(lambda x: x[0].startswith('Ru'))#.reduceByKey(lambda x, y: x + y)
#data_test_filter = data_test_bis.filter(lambda x: x[0].startswith('"R'))
print("TEST FILTER !")
print(type(data_test_bis))
print(data_test_bis.take(5))
print(data_test_bis.keys().take(10))
print(data_test_bis.values().take(10))

结果:

TEST FILTER !
<class 'pyspark.rdd.PipelinedRDD'> 
[('Rueil-Malmaison', 1), ('Ruse', 1), ('Rueil Malmaison', 1), ('Rueil-Malmaison', 1), ('Ruda Slaska', 1)]
['Rueil-Malmaison', 'Ruse', 'Rueil Malmaison', 'Rueil-Malmaison', 'Ruda Slaska', 'Ruisbroek (Belgique)', 'Ruda \xc3\x85\xc5\xa1l\xc3\x84\xe2\x80\xa6ska', 'Rueil malmaison', 'Rueil', 'Ruisbroek']
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

当我尝试这个时,出现错误:

print(data_test_bis.reduceByKey(add).take(10))

print(data_test_bis.reduceByKey(lambda x, y: x + y).take(10))

错误:

17/01/03 17:47:09 ERROR scheduler.TaskSetManager: Task 18 in stage 3.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "/home/spark/julien/Test_.py", line 89, in <module>
    test()
  File "/home/spark/julien/Test_.py", line 33, in test
    print(data_test_bis.reduceByKey(lambda x, y:x+y).take(10))
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1297, in take
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 939, in runJob
  File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/home/spark/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 3.0 failed 4 times, most recent failure: Lost task 18.3 in stage 3.0 (TID 67, 10.0.15.7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/home/spark/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
  File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "/home/spark/julien/Test_.py", line 25, in <lambda>
IndexError: list index out of range

我不明白为什么我有一个 IndexError...

跟着我重复:我永远不会假设非结构化数据源的格式正确

诸如此类的事情:

... .map(lambda x: (x.split(",")[8].encode("utf-8") ...)

非常适合快速教程,但在实践中毫无用处。一般来说,永远不要依赖于以下假设:

  • 数据具有特定的形状(例如 9,逗号分隔的字段)。
  • 编码/解码会成功(这里其实可以,但一般情况下不是)。

至少包括一个简单的异常处理:

def parse_to_pair(line):
    try:
        key = (line
            .split(",")[8]
            .encode("utf-8")
            .replace('"', '')
            .replace("'", ''))

        return [(key, 1)]
    except:
        return []

并使用 flatMap:

data_textfile.flatMap(parse_to_pair)

备注:

  • 您可以通过调用 SparkContext.textFile 并将 use_unicode 设置为 False 来跳过 encode。它将:

    • 在Python中使用str代替unicode 2.
    • 在Python中使用bytes 3.
  • 您不仅应确保该行包含至少 9 个字段,而且还应包含预期数量的字段。

  • 如果您碰巧将 csv 作为输入,请使用 csv reader.