使用 Airflow 将 pyspark 作业提交到 EMR 时缺少 Python 依赖项
Missing Python dependency when submitting pyspark job to EMR using Airflow
我们正在使用 bootstrap 脚本在 EMR 集群节点上为我们的 Spark 作业安装 python 库。该脚本看起来像这样:
sudo python3 -m pip install pandas==0.22.0 scikit-learn==0.21.0
集群启动后,我们使用 Airflow 的 SparkSubmitHook
将作业提交到 EMR。我们使用 this 配置将 pyspark 绑定到 python3。问题是,偶尔,当作业开始 运行 时,我们会收到 ModuleNotFoundError: No module named 'sklearn'
错误。一种这样的堆栈跟踪如下所示:
return self.loads(obj)
File "/mnt1/yarn/usercache/root/appcache/application_1565624418111_0001/container_1565624418111_0001_01_000033/pyspark.zip/pyspark/serializers.py", line 577, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'sklearn'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
此问题本质上是偶发性的,因此在 10 次作业提交中,它可能会发生 2-3 次。我们正在使用 EMR 5.23.0。我也尝试升级到 5.26.0,但同样的问题仍然存在。
如果我转到群集节点并检查 'missing' 包,我可以看到它已经安装。所以,显然这不是 bootstrap 脚本的问题。这让我很困惑,因为我对这里发生的事情一无所知。我猜想当从 Airflow 触发作业时它绑定到不同的 python 版本,但这只是在黑暗中拍摄。感谢任何帮助。
类似案例可以参考。不确定它是否适用于 EMR
在hadoop情况下,python环境和包应该安装在用户hadoop或spark下。
如果在 root 或其他用户环境中安装 python 软件包,可能会发生类似情况。
因此,请尝试使用 hadoop
或 spark
.
的相同用户名安装您的软件包
更新============================================ ===
我以前安装过类似spark cloud环境的cloudear work bench。那样的话,分布式依赖也是需要的。
关键是:
-
- 在所有云节点中安装依赖包。
-
- 设置conda虚拟环境
-
- 设置 pyspark 或 pyspark3 路径环境。
-
- 将 yarn & spark 配置部署到网关(sparksubmit 主机或 airflow 主机)。
祝你好运。
如果觉得回答有帮助,请点赞。
解决问题的一种方法是更改将作业提交到集群的方式:
- 将步骤的代码打包到 运行(及其依赖项)在 s3 存储桶上(例如使用 pipenv 和 pipfiles)。包裹看起来像这样:
<script_to_execute_package>.zip
|- <script_to_execute_main>.py
|-other step files.py
|- ...
|-scikit-learn
|-scikit-learn files
| ...
|-pandas
|- pandas files
|- ...
|-other packages
|-other packages files
|- ...
- 不使用 SparkSubmitHook,而是使用 EmrAddStepsOperator (+Sensor +CreateJobFlowOperator)。 运行 打包 Python 代码的步骤。
它会是这样的:
step_to_run = [
{
'Name': 'your_step_name',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ["spark-submit", "--master", "yarn", "--deploy-mode", "client", "--py-files", "s3://<script_to_execute_package>.zip", "/tmp/driver.py", "<script_to_execute_main>.py", "", "--arg_to_pass_1", "arg1", "--arg_to_pass_2", "arg2", ...]
}
}
]
some_task = EmrAddStepsOperator(
task_id='some_task',
job_flow_id='the_previously_created_job_flow_id',
aws_conn_id=aws_conn_id,
steps=extract_check_args_spark_step,
dag=dag
)
some_task_check = EmrStepSensor(
task_id='task_check_extract_check',
job_flow_id='the_previously_created_job_flow_id',
step_id="{{ task_instance.xcom_pull('some_task', key='return_value')[0] }}",
aws_conn_id=aws_conn_id,
poke_interval=10,
dag=dag
)
经过大量试验和错误后,以下代码片段作为 bootstrap 脚本运行良好。注释掉的部分之前也包含在我们的 scipt 中,它引起了问题。删除该部分后,一切似乎都正常。
sudo python3 -m pip install --upgrade pip==19.1.1 >> /tmp/out.log
wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/s/spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log
sudo yum -y localinstall spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log
sudo python3 -m pip install python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 scikit-learn==0.21.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183 rtree==0.8.3 geopandas==0.5.0 >> /tmp/out.log
# python3 -m pip install --user python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183
# python3 -m pip install --user scikit-learn==0.21.0
请注意,当通过气流提交作业时,它会以 root 用户身份运行。所以可能这就是 --user
安装不起作用的原因。因为这个脚本在每个 EMR 节点上以用户 hadoop 的身份执行。
如果您在 DAG 文件中使用 LaunchClusterOperator,另一种解决方案是使用
"cluster_overrides" 属性。然后你可以从 this 亚马逊页面复制配置。所以结果看起来像这样(故意提到 "Configurations" 两次):
LaunchClusterOperator(dag=yourdag, param2="something", cluster_overrides={
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}
}
]
}
]
}
)
我们正在使用 bootstrap 脚本在 EMR 集群节点上为我们的 Spark 作业安装 python 库。该脚本看起来像这样:
sudo python3 -m pip install pandas==0.22.0 scikit-learn==0.21.0
集群启动后,我们使用 Airflow 的 SparkSubmitHook
将作业提交到 EMR。我们使用 this 配置将 pyspark 绑定到 python3。问题是,偶尔,当作业开始 运行 时,我们会收到 ModuleNotFoundError: No module named 'sklearn'
错误。一种这样的堆栈跟踪如下所示:
return self.loads(obj)
File "/mnt1/yarn/usercache/root/appcache/application_1565624418111_0001/container_1565624418111_0001_01_000033/pyspark.zip/pyspark/serializers.py", line 577, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'sklearn'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
此问题本质上是偶发性的,因此在 10 次作业提交中,它可能会发生 2-3 次。我们正在使用 EMR 5.23.0。我也尝试升级到 5.26.0,但同样的问题仍然存在。
如果我转到群集节点并检查 'missing' 包,我可以看到它已经安装。所以,显然这不是 bootstrap 脚本的问题。这让我很困惑,因为我对这里发生的事情一无所知。我猜想当从 Airflow 触发作业时它绑定到不同的 python 版本,但这只是在黑暗中拍摄。感谢任何帮助。
类似案例可以参考。不确定它是否适用于 EMR 在hadoop情况下,python环境和包应该安装在用户hadoop或spark下。
如果在 root 或其他用户环境中安装 python 软件包,可能会发生类似情况。
因此,请尝试使用 hadoop
或 spark
.
更新============================================ ===
我以前安装过类似spark cloud环境的cloudear work bench。那样的话,分布式依赖也是需要的。
关键是:
-
- 在所有云节点中安装依赖包。
-
- 设置conda虚拟环境
-
- 设置 pyspark 或 pyspark3 路径环境。
-
- 将 yarn & spark 配置部署到网关(sparksubmit 主机或 airflow 主机)。
祝你好运。
如果觉得回答有帮助,请点赞。
解决问题的一种方法是更改将作业提交到集群的方式:
- 将步骤的代码打包到 运行(及其依赖项)在 s3 存储桶上(例如使用 pipenv 和 pipfiles)。包裹看起来像这样:
<script_to_execute_package>.zip
|- <script_to_execute_main>.py
|-other step files.py
|- ...
|-scikit-learn
|-scikit-learn files
| ...
|-pandas
|- pandas files
|- ...
|-other packages
|-other packages files
|- ...
- 不使用 SparkSubmitHook,而是使用 EmrAddStepsOperator (+Sensor +CreateJobFlowOperator)。 运行 打包 Python 代码的步骤。 它会是这样的:
step_to_run = [
{
'Name': 'your_step_name',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ["spark-submit", "--master", "yarn", "--deploy-mode", "client", "--py-files", "s3://<script_to_execute_package>.zip", "/tmp/driver.py", "<script_to_execute_main>.py", "", "--arg_to_pass_1", "arg1", "--arg_to_pass_2", "arg2", ...]
}
}
]
some_task = EmrAddStepsOperator(
task_id='some_task',
job_flow_id='the_previously_created_job_flow_id',
aws_conn_id=aws_conn_id,
steps=extract_check_args_spark_step,
dag=dag
)
some_task_check = EmrStepSensor(
task_id='task_check_extract_check',
job_flow_id='the_previously_created_job_flow_id',
step_id="{{ task_instance.xcom_pull('some_task', key='return_value')[0] }}",
aws_conn_id=aws_conn_id,
poke_interval=10,
dag=dag
)
经过大量试验和错误后,以下代码片段作为 bootstrap 脚本运行良好。注释掉的部分之前也包含在我们的 scipt 中,它引起了问题。删除该部分后,一切似乎都正常。
sudo python3 -m pip install --upgrade pip==19.1.1 >> /tmp/out.log
wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/s/spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log
sudo yum -y localinstall spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log
sudo python3 -m pip install python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 scikit-learn==0.21.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183 rtree==0.8.3 geopandas==0.5.0 >> /tmp/out.log
# python3 -m pip install --user python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183
# python3 -m pip install --user scikit-learn==0.21.0
请注意,当通过气流提交作业时,它会以 root 用户身份运行。所以可能这就是 --user
安装不起作用的原因。因为这个脚本在每个 EMR 节点上以用户 hadoop 的身份执行。
如果您在 DAG 文件中使用 LaunchClusterOperator,另一种解决方案是使用 "cluster_overrides" 属性。然后你可以从 this 亚马逊页面复制配置。所以结果看起来像这样(故意提到 "Configurations" 两次):
LaunchClusterOperator(dag=yourdag, param2="something", cluster_overrides={
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}
}
]
}
]
}
)