如何在pyspark中找到Dataframe列是一对一或一对多映射?
How to find Dataframe columns are one to one or one to many mapping in pyspark?
我有一个如下所示的数据框:
df0 = sc.parallelize([
(1, 3),
(2, 3),
(1, 2)
]).toDF(["id",'t'])
当我执行表演时:
df0.show()
+---+---+
| id| t|
+---+---+
| 1| 3|
| 2| 3|
| 1| 2|
+---+---+
我想确定列 id、t 之间的关系。
在给出 id
列和 t
之间的 df0
关系是一对多的,因为 id 列 1 和 t 列 3 即(1,3)和下一个(1,2)。一对多
我的预期输出如下:
+---+---+---+
|idt| id| t|
+---+---+---+
| id| OO| OM|
| t| OM| OO|
+---+---+---+
你可以通过分组和计数来做到这一点。
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import *
def relation_type(df, fromCol, toCol):
df2 = df.groupBy(fromCol)\
.agg(F.countDistinct(toCol).alias('val_count'))\
.agg(F.max('val_count').alias('max_rel_count'))
return df2.withColumn('mapping', when(df2['max_rel_count'] > 1, 'OM')\
.otherwise('OO'))\
.drop('max_rel_count')
def relation_types(df, cols):
schemaArr = [StructField('ColName', StringType(), True)]
for i in cols:
schemaArr.append(StructField(i, StringType(), True))
schema = StructType(schemaArr)
result = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for i in cols:
rowDict = []
rowDict.append(i)
for j in cols:
val = relation_type(df, i, j).collect()[0]
rowDict.append(val['mapping'])
row = sqlContext.createDataFrame([rowDict])
result = result.union(row)
return result
然后用你想要的列调用它
relation_types(df, ['id', 't']).show()
结果
+-------+---+---+
|ColName| id| t|
+-------+---+---+
| id| OO| OM|
| t| OM| OO|
+-------+---+---+
我有一个如下所示的数据框:
df0 = sc.parallelize([
(1, 3),
(2, 3),
(1, 2)
]).toDF(["id",'t'])
当我执行表演时:
df0.show()
+---+---+
| id| t|
+---+---+
| 1| 3|
| 2| 3|
| 1| 2|
+---+---+
我想确定列 id、t 之间的关系。
在给出 id
列和 t
之间的 df0
关系是一对多的,因为 id 列 1 和 t 列 3 即(1,3)和下一个(1,2)。一对多
我的预期输出如下:
+---+---+---+
|idt| id| t|
+---+---+---+
| id| OO| OM|
| t| OM| OO|
+---+---+---+
你可以通过分组和计数来做到这一点。
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import *
def relation_type(df, fromCol, toCol):
df2 = df.groupBy(fromCol)\
.agg(F.countDistinct(toCol).alias('val_count'))\
.agg(F.max('val_count').alias('max_rel_count'))
return df2.withColumn('mapping', when(df2['max_rel_count'] > 1, 'OM')\
.otherwise('OO'))\
.drop('max_rel_count')
def relation_types(df, cols):
schemaArr = [StructField('ColName', StringType(), True)]
for i in cols:
schemaArr.append(StructField(i, StringType(), True))
schema = StructType(schemaArr)
result = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for i in cols:
rowDict = []
rowDict.append(i)
for j in cols:
val = relation_type(df, i, j).collect()[0]
rowDict.append(val['mapping'])
row = sqlContext.createDataFrame([rowDict])
result = result.union(row)
return result
然后用你想要的列调用它
relation_types(df, ['id', 't']).show()
结果
+-------+---+---+
|ColName| id| t|
+-------+---+---+
| id| OO| OM|
| t| OM| OO|
+-------+---+---+