pySpark Reduce 抛出 Py4JJavaError
pySpark Reduce throws Py4JJavaError
我是 Spark、Hadoop 和所有大数据生态系统的初学者。
我正在使用 Spark 3.0.1、Hadoop 2.7 和 Python 3.6。
我有这个 .json 文件(下面的 json 只是对实际文件的概述):
[{"number":122,"name":"122 - LOWER RIVER TCE / ELLIS ST","address":"Lower River Tce / Ellis St","latitude":-27.482279,"longitude":153.028723},{"number":91,"name":"91 - MAIN ST / DARRAGH ST","address":"Main St / Darragh St","latitude":-27.47059,"longitude":153.036046}]
我想对其进行解析,对其进行一些数据准备,然后使用 KMeans 进行聚类。
这是我目前所做的:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import matplotlib.pyplot as plt
conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)
rdd = rdd.flatMap(lambda line: line.split('},{'))
rdd = rdd.map(lambda row: row.replace('[', ""))
rdd = rdd.map(lambda row: row.replace('{', ""))
rdd = rdd.map(lambda row: "{"+row+"}")
import json
rdd = rdd.map(lambda row: json.loads(row))
rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))
当我尝试减少时:rdd=rdd.reduce(lambda number, pos : pos)
我收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-32-5db1324541cf> in <module>
----> 1 rdd=rdd.reduce(lambda number, pos : pos)
/opt/spark/python/pyspark/rdd.py in reduce(self, f)
842 yield reduce(f, iterator, initial)
843
--> 844 vals = self.mapPartitions(func).collect()
845 if vals:
846 return reduce(f, vals)
/opt/spark/python/pyspark/rdd.py in collect(self)
814 """
815 with SCCallSiteSync(self.context) as css:
--> 816 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
817 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
818
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
谁能帮我解决这个问题?我将不胜感激。
此致
最后的大括号 }
和方括号 ]
似乎还没有被删除
rdd = rdd.map(lambda row: row.replace('[', ""))
rdd = rdd.map(lambda row: row.replace('{', ""))
//add the following
rdd = rdd.map(lambda row: row.replace(']', ""))
rdd = rdd.map(lambda row: row.replace('}', ""))
或者您可以考虑让 json
包为您执行所有 json 提取
conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)
import json
rdd = rdd.flatMap(lambda line: json.loads(line))
rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))
我是 Spark、Hadoop 和所有大数据生态系统的初学者。
我正在使用 Spark 3.0.1、Hadoop 2.7 和 Python 3.6。
我有这个 .json 文件(下面的 json 只是对实际文件的概述):
[{"number":122,"name":"122 - LOWER RIVER TCE / ELLIS ST","address":"Lower River Tce / Ellis St","latitude":-27.482279,"longitude":153.028723},{"number":91,"name":"91 - MAIN ST / DARRAGH ST","address":"Main St / Darragh St","latitude":-27.47059,"longitude":153.036046}]
我想对其进行解析,对其进行一些数据准备,然后使用 KMeans 进行聚类。
这是我目前所做的:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import matplotlib.pyplot as plt
conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)
rdd = rdd.flatMap(lambda line: line.split('},{'))
rdd = rdd.map(lambda row: row.replace('[', ""))
rdd = rdd.map(lambda row: row.replace('{', ""))
rdd = rdd.map(lambda row: "{"+row+"}")
import json
rdd = rdd.map(lambda row: json.loads(row))
rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))
当我尝试减少时:rdd=rdd.reduce(lambda number, pos : pos)
我收到以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-32-5db1324541cf> in <module>
----> 1 rdd=rdd.reduce(lambda number, pos : pos)
/opt/spark/python/pyspark/rdd.py in reduce(self, f)
842 yield reduce(f, iterator, initial)
843
--> 844 vals = self.mapPartitions(func).collect()
845 if vals:
846 return reduce(f, vals)
/opt/spark/python/pyspark/rdd.py in collect(self)
814 """
815 with SCCallSiteSync(self.context) as css:
--> 816 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
817 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
818
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
谁能帮我解决这个问题?我将不胜感激。
此致
最后的大括号 }
和方括号 ]
似乎还没有被删除
rdd = rdd.map(lambda row: row.replace('[', ""))
rdd = rdd.map(lambda row: row.replace('{', ""))
//add the following
rdd = rdd.map(lambda row: row.replace(']', ""))
rdd = rdd.map(lambda row: row.replace('}', ""))
或者您可以考虑让 json
包为您执行所有 json 提取
conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)
import json
rdd = rdd.flatMap(lambda line: json.loads(line))
rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))