运行 一个 python Spark 上的 Apache Beam 管道
Running a python Apache Beam Pipeline on Spark
我在这里尝试使用 apache beam(使用 python sdk),所以我创建了一个简单的管道并尝试将其部署到 Spark 集群上。
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
op = PipelineOptions([
"--runner=DirectRunner"
]
)
with beam.Pipeline(options=op) as p:
p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)
此管道与 DirectRunner 配合良好。所以要在 Spark 上部署相同的代码(因为可移植性是 Beam 中的一个关键概念)...
首先我编辑了 PipelineOptions
提到的 here:
op = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
]
)
job_endpoint
是 url 到 beam spark job server 的 docker 容器,我 运行 使用命令:
docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT
这应该可以正常工作,但作业在 Spark 上失败并出现此错误:
20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297
此外,我在 beam_spark_job_server
日志中有此警告:
WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.
知道这里的问题出在哪里吗?有没有其他方法可以 运行 python Beam Pipelines on spark 而不经过容器化服务?
出现这种情况的原因可能是作业服务器中包含的 Spark 客户端版本与您要向其提交作业的 Spark 版本不匹配。
希望现在回答问题还不算太晚。是的,这是由于版本不匹配。我测试过,它只适用于 Spark 2,不适用于版本 3。如果您需要 Kubernetes 上的 运行 示例,可以参考 https://github.com/cometta/python-apache-beam-spark 。如果它对你有用,可以帮我 'Star' 存储库。随意在存储库上创建一个问题,我会调查它。
我在这里尝试使用 apache beam(使用 python sdk),所以我创建了一个简单的管道并尝试将其部署到 Spark 集群上。
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
op = PipelineOptions([
"--runner=DirectRunner"
]
)
with beam.Pipeline(options=op) as p:
p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)
此管道与 DirectRunner 配合良好。所以要在 Spark 上部署相同的代码(因为可移植性是 Beam 中的一个关键概念)...
首先我编辑了 PipelineOptions
提到的 here:
op = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
]
)
job_endpoint
是 url 到 beam spark job server 的 docker 容器,我 运行 使用命令:
docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT
这应该可以正常工作,但作业在 Spark 上失败并出现此错误:
20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297
此外,我在 beam_spark_job_server
日志中有此警告:
WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.
知道这里的问题出在哪里吗?有没有其他方法可以 运行 python Beam Pipelines on spark 而不经过容器化服务?
出现这种情况的原因可能是作业服务器中包含的 Spark 客户端版本与您要向其提交作业的 Spark 版本不匹配。
希望现在回答问题还不算太晚。是的,这是由于版本不匹配。我测试过,它只适用于 Spark 2,不适用于版本 3。如果您需要 Kubernetes 上的 运行 示例,可以参考 https://github.com/cometta/python-apache-beam-spark 。如果它对你有用,可以帮我 'Star' 存储库。随意在存储库上创建一个问题,我会调查它。