如何将每一列映射到 pyspark 数据框中的其他列?
how to map each column to other column in pyspark dataframe?
我已经通过执行以下代码创建了数据框。
from pyspark.sql import Row
l = [('Ankit',25,'Ankit','Ankit'),('Jalfaizy',22,'Jalfaizy',"aa"),('saurabh',20,'saurabh',"bb"),('Bala',26,"aa","bb")]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]),lname=x[2],mname=x[3]))
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.show()
执行上面的代码后,我的结果如下。
+---+--------+-----+--------+
|age| lname|mname| name|
+---+--------+-----+--------+
| 25| Ankit|Ankit| Ankit|
| 22|Jalfaizy| aa|Jalfaizy|
| 20| saurabh| bb| saurabh|
| 26| aa| bb| Bala|
+---+--------+-----+--------+
但我想映射每一行中的每一列值,并且基于年龄列哪些列是相同的,我的预期结果如下所示。
+---+----------------+-------------------+------------------+
|age| lname_map_same | mname_map_same | name_map_same |
+---+----------------+-------------------+------------------+
| 25| mname,name | lname,name | lname,mname |
| 22| name | none | lname |
| 20| name | none | lname |
| 26| none | none | none |
+---+----------------+-------------------+------------------+
你可以用地图函数解决你的问题。看看下面的代码:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy"),( 26,"aa","bb","Bala")
], ("age", "lname","mname","name"))
#only 3 records added to dataset
def find_identical(row):
labels = ["lname","mname","name"]
result = [row[0],] #save the age for final result
row = row[1:] #drop the age from row
for i in range(3):
s = []
field = row[i]
if field == row[(i+1)%3]: #check whether field is identical with next field
s.append(labels[(i+1)%3])
if field == row[(i-1)%3]: #check whether field is identical with previous field
s.append(labels[(i-1)%3])
if not s: #if no identical values found return None
s = None
result.append(s)
return result
df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same"]).show()
输出:
+---+--------------+--------------+--------------+
|age|lname_map_same|mname_map_same| name_map_same|
+---+--------------+--------------+--------------+
| 25| [mname, name]| [name, lname]|[lname, mname]|
| 22| [name]| null| [lname]|
| 26| null| null| null|
+---+--------------+--------------+--------------+
如果你想有5列应该考虑,你可以按照评论中的说明进行操作。所以你必须修改标签列表并添加一个额外的 if 语句。此外,必须调整所有模运算以匹配 5 并且 for 循环应遍历 5 个元素。然后你得到的代码看起来像:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "lname","mname","name","n1","n2"))
def find_identical(row):
labels = ["lname","mname","name","n1","n2"]
result = [row[0],]
row = row[1:]
for i in range(5):
s = []
field = row[i]
if field == row[(i+1)%5]:
s.append(labels[(i+1)%5])
if field == row[(i-1)%5]:
s.append(labels[(i-1)%5])
if field == row[(i+2)%5]:
s.append(labels[(i+2)%5])
if field == row[(i+3)%5]:
s.append(labels[(i+3)%5])
if not s:
s = None
result.append(s)
return result
df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same","n1_map_same","n2_map_same"]).show(truncate=False)
输出:
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
|age|lname_map_same |mname_map_same |name_map_same |n1_map_same |n2_map_same |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
|25 |[mname, n2, name, n1]|[name, lname, n1, n2]|[n1, mname, n2, lname]|[n2, name, lname, mname]|[lname, n1, mname, name]|
|22 |[name, n1] |[n2] |[n1, lname] |[name, lname] |[mname] |
|26 |null |null |null |null |null |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
动态方法将列数作为参数。但在我的例子中,数字应该在 1 到 5 之间,因为数据集是用最多 5 个属性创建的。它可能看起来像这样:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "n1","n2","n3","n4","n5"))
def find_identical(row,number):
labels = []
for n in range(1,number+1):
labels.append("n"+str(n)) #create labels dynamically
result = [row[0],]
row = row[1:]
for i in range(number):
s = []
field = row[i]
for x in range(1,number):
if field == row[(i+x)%number]:
s.append(labels[(i+x)%number]) #check for similarity in all the other fields
if not s:
s = None
result.append(s)
return result
number=4
colNames=["age",]
for x in range(1,number+1):
colNames.append("n"+str(x)+"_same") #create the 'nX_same' column names
df_new.rdd.map(lambda r: find_identical(r,number)).toDF(colNames).show(truncate=False)
根据数字参数,输出会有所不同,我将年龄列静态保留为第一列。
输出:
+---+------------+------------+------------+------------+
|age|n1_same |n2_same |n3_same |n4_same |
+---+------------+------------+------------+------------+
|25 |[n2, n3, n4]|[n3, n4, n1]|[n4, n1, n2]|[n1, n2, n3]|
|22 |[n3, n4] |null |[n4, n1] |[n1, n3] |
|26 |null |null |null |null |
+---+------------+------------+------------+------------+
我已经通过执行以下代码创建了数据框。
from pyspark.sql import Row
l = [('Ankit',25,'Ankit','Ankit'),('Jalfaizy',22,'Jalfaizy',"aa"),('saurabh',20,'saurabh',"bb"),('Bala',26,"aa","bb")]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]),lname=x[2],mname=x[3]))
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.show()
执行上面的代码后,我的结果如下。
+---+--------+-----+--------+
|age| lname|mname| name|
+---+--------+-----+--------+
| 25| Ankit|Ankit| Ankit|
| 22|Jalfaizy| aa|Jalfaizy|
| 20| saurabh| bb| saurabh|
| 26| aa| bb| Bala|
+---+--------+-----+--------+
但我想映射每一行中的每一列值,并且基于年龄列哪些列是相同的,我的预期结果如下所示。
+---+----------------+-------------------+------------------+
|age| lname_map_same | mname_map_same | name_map_same |
+---+----------------+-------------------+------------------+
| 25| mname,name | lname,name | lname,mname |
| 22| name | none | lname |
| 20| name | none | lname |
| 26| none | none | none |
+---+----------------+-------------------+------------------+
你可以用地图函数解决你的问题。看看下面的代码:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy"),( 26,"aa","bb","Bala")
], ("age", "lname","mname","name"))
#only 3 records added to dataset
def find_identical(row):
labels = ["lname","mname","name"]
result = [row[0],] #save the age for final result
row = row[1:] #drop the age from row
for i in range(3):
s = []
field = row[i]
if field == row[(i+1)%3]: #check whether field is identical with next field
s.append(labels[(i+1)%3])
if field == row[(i-1)%3]: #check whether field is identical with previous field
s.append(labels[(i-1)%3])
if not s: #if no identical values found return None
s = None
result.append(s)
return result
df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same"]).show()
输出:
+---+--------------+--------------+--------------+
|age|lname_map_same|mname_map_same| name_map_same|
+---+--------------+--------------+--------------+
| 25| [mname, name]| [name, lname]|[lname, mname]|
| 22| [name]| null| [lname]|
| 26| null| null| null|
+---+--------------+--------------+--------------+
如果你想有5列应该考虑,你可以按照评论中的说明进行操作。所以你必须修改标签列表并添加一个额外的 if 语句。此外,必须调整所有模运算以匹配 5 并且 for 循环应遍历 5 个元素。然后你得到的代码看起来像:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "lname","mname","name","n1","n2"))
def find_identical(row):
labels = ["lname","mname","name","n1","n2"]
result = [row[0],]
row = row[1:]
for i in range(5):
s = []
field = row[i]
if field == row[(i+1)%5]:
s.append(labels[(i+1)%5])
if field == row[(i-1)%5]:
s.append(labels[(i-1)%5])
if field == row[(i+2)%5]:
s.append(labels[(i+2)%5])
if field == row[(i+3)%5]:
s.append(labels[(i+3)%5])
if not s:
s = None
result.append(s)
return result
df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same","n1_map_same","n2_map_same"]).show(truncate=False)
输出:
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
|age|lname_map_same |mname_map_same |name_map_same |n1_map_same |n2_map_same |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
|25 |[mname, n2, name, n1]|[name, lname, n1, n2]|[n1, mname, n2, lname]|[n2, name, lname, mname]|[lname, n1, mname, name]|
|22 |[name, n1] |[n2] |[n1, lname] |[name, lname] |[mname] |
|26 |null |null |null |null |null |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
动态方法将列数作为参数。但在我的例子中,数字应该在 1 到 5 之间,因为数据集是用最多 5 个属性创建的。它可能看起来像这样:
df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "n1","n2","n3","n4","n5"))
def find_identical(row,number):
labels = []
for n in range(1,number+1):
labels.append("n"+str(n)) #create labels dynamically
result = [row[0],]
row = row[1:]
for i in range(number):
s = []
field = row[i]
for x in range(1,number):
if field == row[(i+x)%number]:
s.append(labels[(i+x)%number]) #check for similarity in all the other fields
if not s:
s = None
result.append(s)
return result
number=4
colNames=["age",]
for x in range(1,number+1):
colNames.append("n"+str(x)+"_same") #create the 'nX_same' column names
df_new.rdd.map(lambda r: find_identical(r,number)).toDF(colNames).show(truncate=False)
根据数字参数,输出会有所不同,我将年龄列静态保留为第一列。
输出:
+---+------------+------------+------------+------------+
|age|n1_same |n2_same |n3_same |n4_same |
+---+------------+------------+------------+------------+
|25 |[n2, n3, n4]|[n3, n4, n1]|[n4, n1, n2]|[n1, n2, n3]|
|22 |[n3, n4] |null |[n4, n1] |[n1, n3] |
|26 |null |null |null |null |
+---+------------+------------+------------+------------+