使用 pyspark,如何将列添加到 DataFrame 作为同一 DataFrame 中多个已知列的键值映射,不包括空值?
Using pyspark, how to add a column to a DataFrame as a key-value map of multiple known columns in the same DataFrame excluding nulls?
给定以下示例:
d = [{'asset': '2', 'ts': 6, 'B':'123','C':'234'},
{'asset': '1', 'ts': 5, 'C.1':'999', 'B':'888','F':'999'}]
df = spark.createDataFrame(d)
df.show(truncate=False)
+---+----+-----+---+----+----+
|B |C |asset|ts |C.1 |F |
+---+----+-----+---+----+----+
|123|234 |2 |6 |null|null|
|888|null|1 |5 |999 |999 |
+---+----+-----+---+----+----+
我想创建以下输出:
+-----+---+--------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------+
|2 |6 |[B -> 123, C -> 234] |
|1 |5 |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+
我尝试了以下方法:
from itertools import chain
from pyspark.sql.functions import *
all_signals=['B','C','C.1','F']
key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`"))
for name in all_signals])))
new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)
+-----+---+--------------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------------+
|2 |6 |[B -> 123, C -> 234, C.1 ->, F ->] |
|1 |5 |[B -> 888, C ->, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------------+
但我不想要具有空值的键。所以我尝试了很多方法来排除null或None。
我尝试了“如果”条件,when/otherwise 但 none 似乎有效。这是一次尝试:
key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`"))
for name in all_signals
if col("`"+name+"`").isNotNull()])))
new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
我使用我不满意的循环方式让它工作:
new_df= df.withColumn("signals", from_json(
to_json(struct(["`"+x+"`" for x in all_signals])),"MAP<STRING,STRING>"))
new_df = new_df.drop(*all_signals)
new_df.show(truncate=False)
+-----+---+--------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------+
|2 |6 |[B -> 123, C -> 234] |
|1 |5 |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+
但必须有一种方法可以排除 null 而无需 json 并返回!
我有一个替代解决方案。首先使用空值构建映射,然后删除空值。
from pyspark.sql.types import MapType, StringType
from pyspark.sql import functions as F
# Original dataframe
data = [{'asset': '2', 'ts': 6, 'B': '123', 'C': '234'},
{'asset': '1', 'ts': 5, 'C.1': '999', 'B': '888', 'F': '999'}]
df = spark.createDataFrame(data)
df.show(truncate=False)
# Create a map that includes null values
# Backticks are needed because spark is weird
#
names = ['B', 'C', 'C.1', 'F']
key_value_list = []
for name in names:
key_value_list += [F.lit(name)]
key_value_list += [df["`{}`".format(name)]]
map_column = F.create_map(*key_value_list)
# UDF that drops null values
remove_null_values_udf = F.udf(
lambda d: {k: v for k, v in d.items() if v is not None},
MapType(StringType(), StringType())
)
# Apply both of the above
df = df.withColumn('map', remove_null_values_udf(map_column)).drop(*names)
df.show()
# +-----+---+--------------------+
# |asset| ts| map|
# +-----+---+--------------------+
# | 2| 6|[B -> 123, C -> 234]|
# | 1| 5|[B -> 888, F -> 9...|
# +-----+---+--------------------+
不需要UDF
,使用高阶函数filter
,用 arrays_zip
和 map_from_entries
以获得您想要的输出。(spark2.4+)
from pyspark.sql import functions as F
all_singals=['B','C','C.1','F']
df.withColumn("all_one", F.array(*[F.lit(x) for x in all_signals]))\
.withColumn("all_two", F.array(*["`"+x+"`" for x in all_signals]))\
.withColumn("signals", F.expr("""map_from_entries(filter(arrays_zip(all_one,all_two),x-> x.all_two is not null))"""))\
.drop("all_one","all_two").show(truncate=False)
#+---+----+-----+---+----+----+--------------------------------+
#|B |C |asset|ts |C.1 |F |signals |
#+---+----+-----+---+----+----+--------------------------------+
#|123|234 |2 |6 |null|null|[B -> 123, C -> 234] |
#|888|null|1 |5 |999 |999 |[B -> 888, C.1 -> 999, F -> 999]|
#+---+----+-----+---+----+----+--------------------------------+
给定以下示例:
d = [{'asset': '2', 'ts': 6, 'B':'123','C':'234'},
{'asset': '1', 'ts': 5, 'C.1':'999', 'B':'888','F':'999'}]
df = spark.createDataFrame(d)
df.show(truncate=False)
+---+----+-----+---+----+----+
|B |C |asset|ts |C.1 |F |
+---+----+-----+---+----+----+
|123|234 |2 |6 |null|null|
|888|null|1 |5 |999 |999 |
+---+----+-----+---+----+----+
我想创建以下输出:
+-----+---+--------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------+
|2 |6 |[B -> 123, C -> 234] |
|1 |5 |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+
我尝试了以下方法:
from itertools import chain
from pyspark.sql.functions import *
all_signals=['B','C','C.1','F']
key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`"))
for name in all_signals])))
new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)
+-----+---+--------------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------------+
|2 |6 |[B -> 123, C -> 234, C.1 ->, F ->] |
|1 |5 |[B -> 888, C ->, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------------+
但我不想要具有空值的键。所以我尝试了很多方法来排除null或None。 我尝试了“如果”条件,when/otherwise 但 none 似乎有效。这是一次尝试:
key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`"))
for name in all_signals
if col("`"+name+"`").isNotNull()])))
new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
我使用我不满意的循环方式让它工作:
new_df= df.withColumn("signals", from_json(
to_json(struct(["`"+x+"`" for x in all_signals])),"MAP<STRING,STRING>"))
new_df = new_df.drop(*all_signals)
new_df.show(truncate=False)
+-----+---+--------------------------------+
|asset|ts |signals |
+-----+---+--------------------------------+
|2 |6 |[B -> 123, C -> 234] |
|1 |5 |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+
但必须有一种方法可以排除 null 而无需 json 并返回!
我有一个替代解决方案。首先使用空值构建映射,然后删除空值。
from pyspark.sql.types import MapType, StringType
from pyspark.sql import functions as F
# Original dataframe
data = [{'asset': '2', 'ts': 6, 'B': '123', 'C': '234'},
{'asset': '1', 'ts': 5, 'C.1': '999', 'B': '888', 'F': '999'}]
df = spark.createDataFrame(data)
df.show(truncate=False)
# Create a map that includes null values
# Backticks are needed because spark is weird
#
names = ['B', 'C', 'C.1', 'F']
key_value_list = []
for name in names:
key_value_list += [F.lit(name)]
key_value_list += [df["`{}`".format(name)]]
map_column = F.create_map(*key_value_list)
# UDF that drops null values
remove_null_values_udf = F.udf(
lambda d: {k: v for k, v in d.items() if v is not None},
MapType(StringType(), StringType())
)
# Apply both of the above
df = df.withColumn('map', remove_null_values_udf(map_column)).drop(*names)
df.show()
# +-----+---+--------------------+
# |asset| ts| map|
# +-----+---+--------------------+
# | 2| 6|[B -> 123, C -> 234]|
# | 1| 5|[B -> 888, F -> 9...|
# +-----+---+--------------------+
不需要UDF
,使用高阶函数filter
,用 arrays_zip
和 map_from_entries
以获得您想要的输出。(spark2.4+)
from pyspark.sql import functions as F
all_singals=['B','C','C.1','F']
df.withColumn("all_one", F.array(*[F.lit(x) for x in all_signals]))\
.withColumn("all_two", F.array(*["`"+x+"`" for x in all_signals]))\
.withColumn("signals", F.expr("""map_from_entries(filter(arrays_zip(all_one,all_two),x-> x.all_two is not null))"""))\
.drop("all_one","all_two").show(truncate=False)
#+---+----+-----+---+----+----+--------------------------------+
#|B |C |asset|ts |C.1 |F |signals |
#+---+----+-----+---+----+----+--------------------------------+
#|123|234 |2 |6 |null|null|[B -> 123, C -> 234] |
#|888|null|1 |5 |999 |999 |[B -> 888, C.1 -> 999, F -> 999]|
#+---+----+-----+---+----+----+--------------------------------+