通过迭代另一个数据框中的列表列来创建数据框
Create a dataframe by iterating over column of list in another dataframe
在 pyspark 中,我有一个 DataFrame,其中包含一个 有序节点列表 要经过的列:
osmDF.schema
Out[1]:
StructType(List(StructField(id,LongType,true),
StructField(nodes,ArrayType(LongType,true),true),
StructField(tags,MapType(StringType,StringType,true),true)))
osmDF.head(3)
Out[2]:
| id | nodes | tags |
|-----------|-----------------------------------------------------|---------------------|
| 62960871 | [783186590,783198852] | "{""foo"":""bar""}" |
| 211528816 | [2215187080,2215187140,2215187205,2215187256] | "{""foo"":""boo""}" |
| 62960872 | [783198772,783183397,783167527,783169067,783198772] | "{""foo"":""buh""}" |
我需要为节点列表中 2 个节点的每个连续组合创建一个数据框,然后将其保存为镶木地板。
预期结果的长度为 n-1,每行 n len(nodes)
。它看起来像这样(我将添加其他列):
| id | from | to | tags |
|-----------------------|------------|------------|---------------------|
| 783186590_783198852 | 783186590 | 783198852 | "{""foo"":""bar""}" |
| 2215187080_2215187140 | 2215187080 | 2215187140 | "{""foo"":""boo""}" |
| 2215187140_2215187205 | 2215187140 | 2215187205 | "{""foo"":""boo""}" |
| 2215187205_2215187256 | 2215187205 | 2215187256 | "{""foo"":""boo""}" |
| 783198772_783183397 | 783198772 | 783183397 | "{""foo"":""buh""}" |
| 783183397_783167527 | 783183397 | 783167527 | "{""foo"":""buh""}" |
| 783167527_783169067 | 783167527 | 783169067 | "{""foo"":""buh""}" |
| 783169067_783198772 | 783169067 | 783198772 | "{""foo"":""buh""}" |
我尝试使用以下内容启动
from pyspark.sql.functions import udf
def split_ways_into_arcs(row):
arcs = []
for node in range(len(row['nodes']) - 1):
arc = dict()
arc['id'] = str(row['nodes'][node]) + "_" + str(row['nodes'][node + 1])
arc['from'] = row['nodes'][node]
arc['to'] = row['nodes'][node + 1]
arc['tags'] = row['tags']
arcs.append(arc)
return arcs
# Declare function as udf
split = udf(lambda row: split_ways_into_arcs(row.asDict()))
我遇到的问题是我不知道原始 DataFrame 的每一行中有多少个节点。
我知道如何应用 udf
向现有 DataFrame 添加一列,但不知道如何从字典列表中创建一个新列。
使用 transform 和 explode
之后的数组迭代 nodes
数组:
from pyspark.sql import functions as F
df = ...
df.withColumn("nodes", F.expr("transform(nodes, (n,i) -> named_struct('from', nodes[i], 'to', nodes[i+1]))")) \
.withColumn("nodes", F.explode("nodes")) \
.filter("not nodes.to is null") \
.selectExpr("concat_ws('_', nodes.to, nodes.from) as id", "nodes.*", "tags") \
.show(truncate=False)
输出:
+---------------------+----------+----------+-----------------+
|id |from |to |tags |
+---------------------+----------+----------+-----------------+
|783198852_783186590 |783186590 |783198852 |{""foo"":""bar""}|
|2215187140_2215187080|2215187080|2215187140|{""foo"":""boo""}|
|2215187205_2215187140|2215187140|2215187205|{""foo"":""boo""}|
|2215187256_2215187205|2215187205|2215187256|{""foo"":""boo""}|
|783183397_783198772 |783198772 |783183397 |{""foo"":""buh""}|
|783167527_783183397 |783183397 |783167527 |{""foo"":""buh""}|
|783169067_783167527 |783167527 |783169067 |{""foo"":""buh""}|
|783198772_783169067 |783169067 |783198772 |{""foo"":""buh""}|
+---------------------+----------+----------+-----------------+
在 pyspark 中,我有一个 DataFrame,其中包含一个 有序节点列表 要经过的列:
osmDF.schema
Out[1]:
StructType(List(StructField(id,LongType,true),
StructField(nodes,ArrayType(LongType,true),true),
StructField(tags,MapType(StringType,StringType,true),true)))
osmDF.head(3)
Out[2]:
| id | nodes | tags |
|-----------|-----------------------------------------------------|---------------------|
| 62960871 | [783186590,783198852] | "{""foo"":""bar""}" |
| 211528816 | [2215187080,2215187140,2215187205,2215187256] | "{""foo"":""boo""}" |
| 62960872 | [783198772,783183397,783167527,783169067,783198772] | "{""foo"":""buh""}" |
我需要为节点列表中 2 个节点的每个连续组合创建一个数据框,然后将其保存为镶木地板。
预期结果的长度为 n-1,每行 n len(nodes)
。它看起来像这样(我将添加其他列):
| id | from | to | tags |
|-----------------------|------------|------------|---------------------|
| 783186590_783198852 | 783186590 | 783198852 | "{""foo"":""bar""}" |
| 2215187080_2215187140 | 2215187080 | 2215187140 | "{""foo"":""boo""}" |
| 2215187140_2215187205 | 2215187140 | 2215187205 | "{""foo"":""boo""}" |
| 2215187205_2215187256 | 2215187205 | 2215187256 | "{""foo"":""boo""}" |
| 783198772_783183397 | 783198772 | 783183397 | "{""foo"":""buh""}" |
| 783183397_783167527 | 783183397 | 783167527 | "{""foo"":""buh""}" |
| 783167527_783169067 | 783167527 | 783169067 | "{""foo"":""buh""}" |
| 783169067_783198772 | 783169067 | 783198772 | "{""foo"":""buh""}" |
我尝试使用以下内容启动
from pyspark.sql.functions import udf
def split_ways_into_arcs(row):
arcs = []
for node in range(len(row['nodes']) - 1):
arc = dict()
arc['id'] = str(row['nodes'][node]) + "_" + str(row['nodes'][node + 1])
arc['from'] = row['nodes'][node]
arc['to'] = row['nodes'][node + 1]
arc['tags'] = row['tags']
arcs.append(arc)
return arcs
# Declare function as udf
split = udf(lambda row: split_ways_into_arcs(row.asDict()))
我遇到的问题是我不知道原始 DataFrame 的每一行中有多少个节点。
我知道如何应用 udf
向现有 DataFrame 添加一列,但不知道如何从字典列表中创建一个新列。
使用 transform 和 explode
之后的数组迭代 nodes
数组:
from pyspark.sql import functions as F
df = ...
df.withColumn("nodes", F.expr("transform(nodes, (n,i) -> named_struct('from', nodes[i], 'to', nodes[i+1]))")) \
.withColumn("nodes", F.explode("nodes")) \
.filter("not nodes.to is null") \
.selectExpr("concat_ws('_', nodes.to, nodes.from) as id", "nodes.*", "tags") \
.show(truncate=False)
输出:
+---------------------+----------+----------+-----------------+
|id |from |to |tags |
+---------------------+----------+----------+-----------------+
|783198852_783186590 |783186590 |783198852 |{""foo"":""bar""}|
|2215187140_2215187080|2215187080|2215187140|{""foo"":""boo""}|
|2215187205_2215187140|2215187140|2215187205|{""foo"":""boo""}|
|2215187256_2215187205|2215187205|2215187256|{""foo"":""boo""}|
|783183397_783198772 |783198772 |783183397 |{""foo"":""buh""}|
|783167527_783183397 |783183397 |783167527 |{""foo"":""buh""}|
|783169067_783167527 |783167527 |783169067 |{""foo"":""buh""}|
|783198772_783169067 |783169067 |783198772 |{""foo"":""buh""}|
+---------------------+----------+----------+-----------------+