pyspark 数据框和多边形(geopandas)之间的空间连接
Spatial Join between pyspark dataframe and polygons (geopandas)
问题:
我想在以下之间进行空间连接:
- 一个大的 Spark Dataframe(500M 行),有 个点(例如道路上的点)
- 一个带有多边形(例如区域边界)的小型geojson(20000个形状)。
这是我目前所拥有的,我发现它很慢(很多调度程序延迟,可能是因为 communes 没有广播):
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
pandas_udf 将 points 数据帧(轨迹)的一部分作为 pandas 数据帧,将其转换为带有 geo[= 的 GeoDataFrame 50=],并与 polygons GeoDataFrame 进行空间连接(因此受益于 Geopandas 的 Rtree 连接)
问题:
有没有办法让它更快?我知道我的 communes 地理数据框在 Spark 驱动程序的内存中,并且每个工作人员都必须在每次调用 udf 时下载它,这是正确的吗?
但是我不知道如何让工作人员直接使用这个 GeoDataFrame(如在广播连接中)
有什么想法吗?
一年后,这就是我最终按照 @ndricca 的建议所做的事情,诀窍是广播公社,但你不能广播 GeoDataFrame
目录,所以你必须加载它作为 Spark DataFrame,然后在广播之前将其转换为 JSON。然后使用 shapely.wkt
(众所周知的文本:一种将几何对象编码为文本的方法)
在 UDF 中重建 GeoDataFrame
另一个技巧是在 groupby 中使用 salt 来确保数据在集群中的平等重新分配
import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
polygons = [wkt.loads(w) for w in communes['wkt']]
gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
return joined_df[columns]
traces = traces.groupby(salt).apply(join_communes_bc)
问题:
我想在以下之间进行空间连接:
- 一个大的 Spark Dataframe(500M 行),有 个点(例如道路上的点)
- 一个带有多边形(例如区域边界)的小型geojson(20000个形状)。
这是我目前所拥有的,我发现它很慢(很多调度程序延迟,可能是因为 communes 没有广播):
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
pandas_udf 将 points 数据帧(轨迹)的一部分作为 pandas 数据帧,将其转换为带有 geo[= 的 GeoDataFrame 50=],并与 polygons GeoDataFrame 进行空间连接(因此受益于 Geopandas 的 Rtree 连接)
问题:
有没有办法让它更快?我知道我的 communes 地理数据框在 Spark 驱动程序的内存中,并且每个工作人员都必须在每次调用 udf 时下载它,这是正确的吗?
但是我不知道如何让工作人员直接使用这个 GeoDataFrame(如在广播连接中)
有什么想法吗?
一年后,这就是我最终按照 @ndricca 的建议所做的事情,诀窍是广播公社,但你不能广播 GeoDataFrame
目录,所以你必须加载它作为 Spark DataFrame,然后在广播之前将其转换为 JSON。然后使用 shapely.wkt
(众所周知的文本:一种将几何对象编码为文本的方法)
GeoDataFrame
另一个技巧是在 groupby 中使用 salt 来确保数据在集群中的平等重新分配
import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
polygons = [wkt.loads(w) for w in communes['wkt']]
gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
return joined_df[columns]
traces = traces.groupby(salt).apply(join_communes_bc)