Pyspark:检查元组列表中是否包含元组
Pyspark: check if a tuple is contained in a list of tuples
我正在尝试分析来自 2 个不同来源(A 和 B)的数据的可靠性。由于字段范围相当不均等,我主要关注公共字段和 运行 比较。
这里我选择了价格和数量,并希望确保元组 [priceA, quantityA] 包含在我的元组列表 [[price1B, quantity1B], [price2B, quantity2B], .. ] 来自源 B。
我试图创建一个 udf 来查看其他参考资料,但我刚刚开始使用 Pyspark,我并不真正了解如何定义我的 udf 和在给定情况下指定的适当数据类型。
我的 2 个独立来源有 2 个数据框
我为每个 df 添加了一个新列 "combined" : StructField(combined_a,ArrayType(IntegerType,true),false)))
df_a = df_a.withColumn("combined_a", array("Quantity", "PRICE"))
并创建了一个唯一元组列表:
list_a = list(df_a.select("combined_a").distinct().toPandas()["combined_a"])
输出list_a
list_a = [ [81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0] ...]
我找不到任何可以满足我的要求的内置函数:我想附加一个布尔类型的新列 "combinaison_in_b"。试过:
df_a = df_a.withColumn('combinaison_in_b_found' , col('combined_a').isin(list_b))
Returns 跟随错误
An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [50, 51]
继续使用 udf。试过:
def IsInDataframe(combined_a , list_b):
found = TRUE
for c in combined_a
if c not in list_b:
found = False
if found:
return True
else:
return False
def udf_append(list_b):
return udf(lambda combined_a : IsInDataframe(combined_a , list_b))
df_a.withColumn("combinaison_in_b_found", udf_append(list_b)(col("combined_a"))).cast('boolean')
(udf 语法取自
如果有人能解释一下 return udf)
的部分,我将不胜感激
我想将我的 df 作为附加列 "combinaison_in_b_found" True/False 的输出。
_______________________________________________
id | combined_a | combinaison_in_b_found
1 | [81.0, 100.0] | false
2 | [56.0, 6.0] | true
...
试试这个:
df_a = spark.createDataFrame([(1,[81.0, 100.0]), (1, [56.0, 6.0]),(3,[77.0, 88.0]), (4,[42., 8.])], ('id', 'combined_a') )
df_a.show()
list_b = [ [81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0]]
print('list_b: {}'.format(list_b))
my_udf = udf(lambda pair: 'true' if pair in list_b else 'false', StringType())
df_a = df_a.withColumn('combinaison_in_b_found', my_udf(df_a['combined_a']))
df_a.show()
这是输出:
+---+-------------+
| id| combined_a|
+---+-------------+
| 1|[81.0, 100.0]|
| 1| [56.0, 6.0]|
| 3| [77.0, 88.0]|
| 4| [42.0, 8.0]|
+---+-------------+
list_b: [[81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0]]
+---+-------------+----------------------+
| id| combined_a|combinaison_in_b_found|
+---+-------------+----------------------+
| 1|[81.0, 100.0]| true|
| 1| [56.0, 6.0]| true|
| 3| [77.0, 88.0]| false|
| 4| [42.0, 8.0]| false|
+---+-------------+----------------------+
我正在尝试分析来自 2 个不同来源(A 和 B)的数据的可靠性。由于字段范围相当不均等,我主要关注公共字段和 运行 比较。
这里我选择了价格和数量,并希望确保元组 [priceA, quantityA] 包含在我的元组列表 [[price1B, quantity1B], [price2B, quantity2B], .. ] 来自源 B。
我试图创建一个 udf 来查看其他参考资料,但我刚刚开始使用 Pyspark,我并不真正了解如何定义我的 udf 和在给定情况下指定的适当数据类型。
我的 2 个独立来源有 2 个数据框
我为每个 df 添加了一个新列 "combined" : StructField(combined_a,ArrayType(IntegerType,true),false)))
df_a = df_a.withColumn("combined_a", array("Quantity", "PRICE"))
并创建了一个唯一元组列表:
list_a = list(df_a.select("combined_a").distinct().toPandas()["combined_a"])
输出list_a
list_a = [ [81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0] ...]
我找不到任何可以满足我的要求的内置函数:我想附加一个布尔类型的新列 "combinaison_in_b"。试过:
df_a = df_a.withColumn('combinaison_in_b_found' , col('combined_a').isin(list_b))
Returns 跟随错误
An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [50, 51]
继续使用 udf。试过:
def IsInDataframe(combined_a , list_b):
found = TRUE
for c in combined_a
if c not in list_b:
found = False
if found:
return True
else:
return False
def udf_append(list_b):
return udf(lambda combined_a : IsInDataframe(combined_a , list_b))
df_a.withColumn("combinaison_in_b_found", udf_append(list_b)(col("combined_a"))).cast('boolean')
(udf 语法取自
如果有人能解释一下 return udf)
的部分,我将不胜感激我想将我的 df 作为附加列 "combinaison_in_b_found" True/False 的输出。
_______________________________________________
id | combined_a | combinaison_in_b_found
1 | [81.0, 100.0] | false
2 | [56.0, 6.0] | true
...
试试这个:
df_a = spark.createDataFrame([(1,[81.0, 100.0]), (1, [56.0, 6.0]),(3,[77.0, 88.0]), (4,[42., 8.])], ('id', 'combined_a') )
df_a.show()
list_b = [ [81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0]]
print('list_b: {}'.format(list_b))
my_udf = udf(lambda pair: 'true' if pair in list_b else 'false', StringType())
df_a = df_a.withColumn('combinaison_in_b_found', my_udf(df_a['combined_a']))
df_a.show()
这是输出:
+---+-------------+
| id| combined_a|
+---+-------------+
| 1|[81.0, 100.0]|
| 1| [56.0, 6.0]|
| 3| [77.0, 88.0]|
| 4| [42.0, 8.0]|
+---+-------------+
list_b: [[81.0, 100.0], [56.0, 6.0], [10000.0, 45.32], [42.0, 6.0]]
+---+-------------+----------------------+
| id| combined_a|combinaison_in_b_found|
+---+-------------+----------------------+
| 1|[81.0, 100.0]| true|
| 1| [56.0, 6.0]| true|
| 3| [77.0, 88.0]| false|
| 4| [42.0, 8.0]| false|
+---+-------------+----------------------+