pySpark Reduce 抛出 Py4JJavaError

pySpark Reduce throws Py4JJavaError

我是 Spark、Hadoop 和所有大数据生态系统的初学者。

我正在使用 Spark 3.0.1、Hadoop 2.7 和 Python 3.6。

我有这个 .json 文件(下面的 json 只是对实际文件的概述):

[{"number":122,"name":"122 - LOWER RIVER TCE / ELLIS ST","address":"Lower River Tce / Ellis St","latitude":-27.482279,"longitude":153.028723},{"number":91,"name":"91 - MAIN ST / DARRAGH ST","address":"Main St / Darragh St","latitude":-27.47059,"longitude":153.036046}]

我想对其进行解析,对其进行一些数据准备,然后使用 KMeans 进行聚类。

这是我目前所做的:

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import matplotlib.pyplot as plt

conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)

rdd = rdd.flatMap(lambda line: line.split('},{'))

rdd = rdd.map(lambda row: row.replace('[', ""))

rdd = rdd.map(lambda row: row.replace('{', ""))

rdd = rdd.map(lambda row: "{"+row+"}")

import json
rdd = rdd.map(lambda row: json.loads(row))

rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))

当我尝试减少时:rdd=rdd.reduce(lambda number, pos : pos) 我收到以下错误:

    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-32-5db1324541cf> in <module>
    ----> 1 rdd=rdd.reduce(lambda number, pos : pos)

    /opt/spark/python/pyspark/rdd.py in reduce(self, f)
        842             yield reduce(f, iterator, initial)
        843 
    --> 844         vals = self.mapPartitions(func).collect()
        845         if vals:
        846             return reduce(f, vals)

    /opt/spark/python/pyspark/rdd.py in collect(self)
        814         """
        815         with SCCallSiteSync(self.context) as css:
    --> 816             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
        817         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
        818 

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
       1255         answer = self.gateway_client.send_command(command)
       1256         return_value = get_return_value(
    -> 1257             answer, self.gateway_client, self.target_id, self.name)
       1258 
       1259         for temp_arg in temp_args:

    /opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
         61     def deco(*a, **kw):
         62         try:
    ---> 63             return f(*a, **kw)
         64         except py4j.protocol.Py4JJavaError as e:
         65             s = e.java_exception.toString()

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        326                 raise Py4JJavaError(
        327                     "An error occurred while calling {0}{1}{2}.\n".
    --> 328                     format(target_id, ".", name), value)
        329             else:
        330                 raise Py4JError(

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
        process()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
      File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
        return _default_decoder.decode(s)
      File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
        raise JSONDecodeError("Extra data", s, end)
    json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1879)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1878)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:990)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
        process()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
      File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
        return _default_decoder.decode(s)
      File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
        raise JSONDecodeError("Extra data", s, end)
    json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

谁能帮我解决这个问题?我将不胜感激。

此致

最后的大括号 } 和方括号 ] 似乎还没有被删除

rdd = rdd.map(lambda row: row.replace('[', ""))

rdd = rdd.map(lambda row: row.replace('{', ""))

//add the following
rdd = rdd.map(lambda row: row.replace(']', ""))

rdd = rdd.map(lambda row: row.replace('}', ""))

或者您可以考虑让 json 包为您执行所有 json 提取

conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)

import json
rdd = rdd.flatMap(lambda line: json.loads(line))

rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))