如何解决 pyspark 中的 pickle 错误?
How to resolve pickle error in pyspark?
我正在遍历文件以收集有关字典中列和行中的值的信息。我有以下在本地工作的代码:
def search_nulls(file_name):
separator = ','
nulls_dict = {}
fp = open(file_name,'r')
null_cols = {}
lines = fp.readlines()
for n,line in enumerate(lines):
line = line.split(separator)
for m,data in enumerate(line):
data = data.strip('\n').strip('\r')
if str(m) not in null_cols:
null_cols[str(m)] = defaultdict(lambda: 0)
if len(data) <= 4:
null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1
return null_cols
files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)
以上代码在没有火花的情况下工作正常。
我评论了上面的最后两行,并尝试使用 spark,因为这是需要 运行 分发的东西的原型:
os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')
sc = SparkContext(conf=conf)
objects = sc.parallelize(files_to_process)
resulting_object = \
objects.map(lambda file_object: find_nulls(file_object))
result = resulting_object.collect()
但是,当使用 spark 时,这会导致以下错误:
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
bytes = self.serializer.dumps(vs)
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found
我找不到任何明显的失败原因,因为它 运行 在本地非常完美,而且我没有在工作节点之间共享任何文件。事实上,无论如何我只是 运行 在我的本地机器上使用它。
有谁知道这可能会失败的充分理由吗?
您问题的根源是以下一行:
null_cols[str(m)] = defaultdict(lambda: 0)
如您在 What can be pickled and unpickled? section of the pickle module documentation 中所读:
The following types can be pickled:
- ...
- functions defined at the top level of a module (using def, not lambda)
- built-in functions defined at the top level of a module
- ...
显然 lambda: 0
不符合上述条件。例如,为了使其工作,您可以将 lambda 表达式替换为 int
:
null_cols[str(m)] = defaultdict(int)
我们怎么可能将 lambda 表达式传递给 PySpark 中的高阶函数?细节决定成败。 PySpark 根据上下文使用不同的序列化程序。要序列化闭包,包括 lambda 表达式,它使用支持 lambda 表达式和嵌套函数的自定义 cloudpickle
。它使用默认的 Python 工具来处理数据。
一些旁注:
- 我不会使用 Python
file
对象来读取数据。它不可移植,不能在本地文件系统之外工作。您可以改用 SparkContex.wholeTextFiles
。
- 如果确实如此,请确保关闭连接。使用
with
语句通常是最好的方法
- 您可以在拆分行之前安全地删除换行符
我正在遍历文件以收集有关字典中列和行中的值的信息。我有以下在本地工作的代码:
def search_nulls(file_name):
separator = ','
nulls_dict = {}
fp = open(file_name,'r')
null_cols = {}
lines = fp.readlines()
for n,line in enumerate(lines):
line = line.split(separator)
for m,data in enumerate(line):
data = data.strip('\n').strip('\r')
if str(m) not in null_cols:
null_cols[str(m)] = defaultdict(lambda: 0)
if len(data) <= 4:
null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1
return null_cols
files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)
以上代码在没有火花的情况下工作正常。 我评论了上面的最后两行,并尝试使用 spark,因为这是需要 运行 分发的东西的原型:
os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')
sc = SparkContext(conf=conf)
objects = sc.parallelize(files_to_process)
resulting_object = \
objects.map(lambda file_object: find_nulls(file_object))
result = resulting_object.collect()
但是,当使用 spark 时,这会导致以下错误:
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
bytes = self.serializer.dumps(vs)
File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found
我找不到任何明显的失败原因,因为它 运行 在本地非常完美,而且我没有在工作节点之间共享任何文件。事实上,无论如何我只是 运行 在我的本地机器上使用它。
有谁知道这可能会失败的充分理由吗?
您问题的根源是以下一行:
null_cols[str(m)] = defaultdict(lambda: 0)
如您在 What can be pickled and unpickled? section of the pickle module documentation 中所读:
The following types can be pickled:
- ...
- functions defined at the top level of a module (using def, not lambda)
- built-in functions defined at the top level of a module
- ...
显然 lambda: 0
不符合上述条件。例如,为了使其工作,您可以将 lambda 表达式替换为 int
:
null_cols[str(m)] = defaultdict(int)
我们怎么可能将 lambda 表达式传递给 PySpark 中的高阶函数?细节决定成败。 PySpark 根据上下文使用不同的序列化程序。要序列化闭包,包括 lambda 表达式,它使用支持 lambda 表达式和嵌套函数的自定义 cloudpickle
。它使用默认的 Python 工具来处理数据。
一些旁注:
- 我不会使用 Python
file
对象来读取数据。它不可移植,不能在本地文件系统之外工作。您可以改用SparkContex.wholeTextFiles
。 - 如果确实如此,请确保关闭连接。使用
with
语句通常是最好的方法 - 您可以在拆分行之前安全地删除换行符