PySpark:在 Pandas UDF 中使用 pyproj 包时如何解决 'python worker failed to connect back' 错误? (将 lat/long 转换为 UTM 坐标)
PySpark: How do I solve 'python worker failed to connect back' error when using pyproj package in Pandas UDF? (Converting lat/long to UTM coordinates)
我有一个坐标为 lat/long 的 json 文件,我尝试在 PySpark 中将其转换为 UTM("x"、"y")。
.json 文件如下所示:
{"positionmessage":{"latitude": 51.822872161865234,"longitude": 4.905614852905273}}
{"positionmessage":{"latitude": 51.819644927978516, "longitude": 4.961687088012695}}
我在 pyspark 中读取了 json 文件,并尝试使用以下脚本在 PySpark 中转换为 UTM('x'、'y'-coord):
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DateType, FloatType, TimestampType, DoubleType
from pyspark.sql.functions import *
appName = "PySpark"
master = "local"
file_name = "lat_lon.JSON"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
schema = StructType([
StructField("positionmessage",
StructType([
StructField('latitude', DoubleType(), True),
StructField('longitude', DoubleType(), True),
]))])
df = spark.read.schema(schema).json(file_name).select("positionmessage.*")
到这里没问题;当我尝试使用 pyproj 包(在 Pandas 中工作)转换为 UTM 坐标时出现问题。
from pyspark.sql.functions import array, pandas_udf, PandasUDFType
from pyproj import Proj
from pandas import Series
# using decorator 'pandas_udf' to wrap the function.
@pandas_udf('array<double>', PandasUDFType.SCALAR)
def get_utm(x):
pp = Proj(proj='utm',zone=31,ellps='WGS84', preserve_units=False)
return Series([ pp(e[0], e[1]) for e in x ])
df = df.withColumn('utm', get_utm(array('longitude','latitude'))) \
.selectExpr("*", "utm[0] as X", "utm[1] as Y")
df.show()
我遇到了问题:“python 工作人员无法重新连接”,但代码本身似乎没有问题。问题是什么?
您可以使用普通 UDF 而不是 Pandas UDF:
@udf(returnType=ArrayType(DoubleType()))
def get_utm(long, lat):
pp = Proj(proj='utm', zone=31, ellps='WGS84', preserve_units=False)
return pp(long, lat)
result = df.withColumn('utm', get_utm('longitude','latitude')).selectExpr("*", "utm[0] as X", "utm[1] as Y")
我有一个坐标为 lat/long 的 json 文件,我尝试在 PySpark 中将其转换为 UTM("x"、"y")。 .json 文件如下所示:
{"positionmessage":{"latitude": 51.822872161865234,"longitude": 4.905614852905273}}
{"positionmessage":{"latitude": 51.819644927978516, "longitude": 4.961687088012695}}
我在 pyspark 中读取了 json 文件,并尝试使用以下脚本在 PySpark 中转换为 UTM('x'、'y'-coord):
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DateType, FloatType, TimestampType, DoubleType
from pyspark.sql.functions import *
appName = "PySpark"
master = "local"
file_name = "lat_lon.JSON"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
schema = StructType([
StructField("positionmessage",
StructType([
StructField('latitude', DoubleType(), True),
StructField('longitude', DoubleType(), True),
]))])
df = spark.read.schema(schema).json(file_name).select("positionmessage.*")
到这里没问题;当我尝试使用 pyproj 包(在 Pandas 中工作)转换为 UTM 坐标时出现问题。
from pyspark.sql.functions import array, pandas_udf, PandasUDFType
from pyproj import Proj
from pandas import Series
# using decorator 'pandas_udf' to wrap the function.
@pandas_udf('array<double>', PandasUDFType.SCALAR)
def get_utm(x):
pp = Proj(proj='utm',zone=31,ellps='WGS84', preserve_units=False)
return Series([ pp(e[0], e[1]) for e in x ])
df = df.withColumn('utm', get_utm(array('longitude','latitude'))) \
.selectExpr("*", "utm[0] as X", "utm[1] as Y")
df.show()
我遇到了问题:“python 工作人员无法重新连接”,但代码本身似乎没有问题。问题是什么?
您可以使用普通 UDF 而不是 Pandas UDF:
@udf(returnType=ArrayType(DoubleType()))
def get_utm(long, lat):
pp = Proj(proj='utm', zone=31, ellps='WGS84', preserve_units=False)
return pp(long, lat)
result = df.withColumn('utm', get_utm('longitude','latitude')).selectExpr("*", "utm[0] as X", "utm[1] as Y")