Jupyter Notebook PySpark OSError [WinError 123] 文件名、目录名或卷标语法不正确:
Jupyter Notebook PySpark OSError [WinError 123] The filename, directory name, or volume label syntax is incorrect:
系统配置:
操作系统:Windows 10
Python版本:3.7
星火版本:2.4.4
SPARK_HOME: C:\spark\spark-2.4.4-bin-hadoop2.7
问题
我正在使用 PySpark 对数据框中一行的所有列进行并行计算。我将我的 Pandas 数据帧转换为 Spark 数据帧。在 spark 数据帧上,执行地图转换和收集操作。同时,执行收集操作时会弹出带有 OSError 的 Py4J 错误。错误出现在import sklearn语句和训练好的分类器(ML模型)中。
代码片段
from sklearn.neural_network.multilayer_perceptron import MLPClassifier
classifier=MLPClassifier()
classifier.fit(x_train, y_train)
def func1(rows,trained_model=classifier):
items = rows.asDict()
row = pd.Series(items)
output = func2(row,trained_model) # Consumes pandas series in other file having import sklearn statement
return output
spdf=spark.createDataFrame(pandasDF)
result=spdf.rdd.map(lambda row:func1(row)).collect()
错误
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-33-0bfb9d088e2d> in <module>
----> 1 result=spdf.rdd.map(lambda row:clusterCreation(row)).collect()
2 print(type(result))
.
.
.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 364, in main
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 71, in read_command
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
return pickle.loads(obj, encoding=encoding)
.
.
.
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\__init__.py", line 7, in <module>
from .forest import RandomForestClassifier
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\forest.py", line 53, in <module>
from ..metrics import r2_score
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\__init__.py", line 7, in <module>
from .ranking import auc
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in <module>
from ..preprocessing import label_binarize
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\__init__.py", line 6, in <module>
from ._function_transformer import FunctionTransformer
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\_function_transformer.py", line 5, in <module>
from ..utils.testing import assert_allclose_dense_sparse
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in <module>
import pytest
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pytest.py", line 6, in <module>
from _pytest.assertion import register_assert_rewrite
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\__init__.py", line 6, in <module>
from _pytest.assertion import rewrite
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\rewrite.py", line 20, in <module>
from _pytest.assertion import util
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\util.py", line 5, in <module>
import _pytest._code
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\__init__.py", line 2, in <module>
from .code import Code # noqa
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\code.py", line 11, in <module>
import pluggy
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\__init__.py", line 16, in <module>
from .manager import PluginManager, PluginValidationError
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\manager.py", line 6, in <module>
import importlib_metadata
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 466, in <module>
__version__ = version(__name__)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 433, in version
return distribution(package).version
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 406, in distribution
return Distribution.from_name(package)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 176, in from_name
dist = next(dists, None)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 362, in <genexpr>
for path in map(cls._switch_path, paths)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 377, in _search_path
if not root.is_dir():
File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1351, in is_dir
return S_ISDIR(self.stat().st_mode)
File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1161, in stat
return self._accessor.stat(self)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'C:\C:\spark\spark-2.4.4-bin-hadoop2.7\jars\spark-core_2.11-2.4.4.jar'
MCVE
此 MCVE 将函数定义为 return 与字典相同的输入行,而原始代码 return 是经过一些处理后的字典。
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('MRC').setMaster('local[2]')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.getOrCreate()
import sklearn
import sklearn.datasets
import sklearn.model_selection
import sklearn.ensemble
iris = sklearn.datasets.load_iris()
train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(iris.data, iris.target, train_size=0.80)
classifier = sklearn.ensemble.RandomForestClassifier()
classifier.fit(train, labels_train)
import pickle
path = './random_classifier.mdl'
pickle.dump(classifier, open(path,'wb'))
import pandas as pd
pddf=pd.DataFrame(test)
spdf=spark.createDataFrame(pddf)
def clusterCreation(rows,classifier_path):
items = rows.asDict()
row = pd.Series(items)
with open(classifier_path,'rb') as fp:
classifier = pickle.load(fp)
print(classifier)
return items
result=spdf.rdd.map(lambda row:clusterCreation(row,classifier_path=path)).collect()
print(result)
我遇到了同样的问题,文件路径包含C:\C:\
。我在 https://github.com/Ibotta/sk-dist/issues/30 中发现了一个讨论,它表明这可能是 scikit-learn
中使用的 pytest
的问题。 scikit-learn
版本 0.21.3 中报告了该问题。我将 scikit-learn
软件包升级到 0.22.1(通过升级到 Anaconda 2020.02),错误消失了。
我的环境是Windows10,Spark 2.4.5,Anaconda 2020.02(包含scikit-learn 0.22.1)。请注意,较旧的 Anaconda 版本 2019.10 包含 scikit-learn
版本 0.21.3.
系统配置: 操作系统:Windows 10 Python版本:3.7 星火版本:2.4.4 SPARK_HOME: C:\spark\spark-2.4.4-bin-hadoop2.7
问题 我正在使用 PySpark 对数据框中一行的所有列进行并行计算。我将我的 Pandas 数据帧转换为 Spark 数据帧。在 spark 数据帧上,执行地图转换和收集操作。同时,执行收集操作时会弹出带有 OSError 的 Py4J 错误。错误出现在import sklearn语句和训练好的分类器(ML模型)中。
代码片段
from sklearn.neural_network.multilayer_perceptron import MLPClassifier
classifier=MLPClassifier()
classifier.fit(x_train, y_train)
def func1(rows,trained_model=classifier):
items = rows.asDict()
row = pd.Series(items)
output = func2(row,trained_model) # Consumes pandas series in other file having import sklearn statement
return output
spdf=spark.createDataFrame(pandasDF)
result=spdf.rdd.map(lambda row:func1(row)).collect()
错误
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-33-0bfb9d088e2d> in <module>
----> 1 result=spdf.rdd.map(lambda row:clusterCreation(row)).collect()
2 print(type(result))
.
.
.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 364, in main
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 71, in read_command
File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
return pickle.loads(obj, encoding=encoding)
.
.
.
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\__init__.py", line 7, in <module>
from .forest import RandomForestClassifier
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\forest.py", line 53, in <module>
from ..metrics import r2_score
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\__init__.py", line 7, in <module>
from .ranking import auc
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in <module>
from ..preprocessing import label_binarize
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\__init__.py", line 6, in <module>
from ._function_transformer import FunctionTransformer
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\_function_transformer.py", line 5, in <module>
from ..utils.testing import assert_allclose_dense_sparse
File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in <module>
import pytest
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pytest.py", line 6, in <module>
from _pytest.assertion import register_assert_rewrite
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\__init__.py", line 6, in <module>
from _pytest.assertion import rewrite
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\rewrite.py", line 20, in <module>
from _pytest.assertion import util
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\util.py", line 5, in <module>
import _pytest._code
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\__init__.py", line 2, in <module>
from .code import Code # noqa
File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\code.py", line 11, in <module>
import pluggy
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\__init__.py", line 16, in <module>
from .manager import PluginManager, PluginValidationError
File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\manager.py", line 6, in <module>
import importlib_metadata
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 466, in <module>
__version__ = version(__name__)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 433, in version
return distribution(package).version
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 406, in distribution
return Distribution.from_name(package)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 176, in from_name
dist = next(dists, None)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 362, in <genexpr>
for path in map(cls._switch_path, paths)
File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 377, in _search_path
if not root.is_dir():
File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1351, in is_dir
return S_ISDIR(self.stat().st_mode)
File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1161, in stat
return self._accessor.stat(self)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'C:\C:\spark\spark-2.4.4-bin-hadoop2.7\jars\spark-core_2.11-2.4.4.jar'
MCVE 此 MCVE 将函数定义为 return 与字典相同的输入行,而原始代码 return 是经过一些处理后的字典。
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('MRC').setMaster('local[2]')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.getOrCreate()
import sklearn
import sklearn.datasets
import sklearn.model_selection
import sklearn.ensemble
iris = sklearn.datasets.load_iris()
train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(iris.data, iris.target, train_size=0.80)
classifier = sklearn.ensemble.RandomForestClassifier()
classifier.fit(train, labels_train)
import pickle
path = './random_classifier.mdl'
pickle.dump(classifier, open(path,'wb'))
import pandas as pd
pddf=pd.DataFrame(test)
spdf=spark.createDataFrame(pddf)
def clusterCreation(rows,classifier_path):
items = rows.asDict()
row = pd.Series(items)
with open(classifier_path,'rb') as fp:
classifier = pickle.load(fp)
print(classifier)
return items
result=spdf.rdd.map(lambda row:clusterCreation(row,classifier_path=path)).collect()
print(result)
我遇到了同样的问题,文件路径包含C:\C:\
。我在 https://github.com/Ibotta/sk-dist/issues/30 中发现了一个讨论,它表明这可能是 scikit-learn
中使用的 pytest
的问题。 scikit-learn
版本 0.21.3 中报告了该问题。我将 scikit-learn
软件包升级到 0.22.1(通过升级到 Anaconda 2020.02),错误消失了。
我的环境是Windows10,Spark 2.4.5,Anaconda 2020.02(包含scikit-learn 0.22.1)。请注意,较旧的 Anaconda 版本 2019.10 包含 scikit-learn
版本 0.21.3.