在 PySpark 中加入查找 table
Joining with a lookup table in PySpark
我有 2 个 table:Table 'A' 和 Table 'Lookup'
Table答:
ID Day
A 1
B 1
C 2
D 4
查找 table 具有每个 ID-Day 组合的百分比值。
Table 查找:
ID 1 2 3 4
A 20 10 50 30
B 0 50 0 50
C 50 10 10 30
D 10 25 25 40
我的预期输出是在 Table 'A' 中有一个名为 'Percent' 的附加字段,其中包含从查找 table:
中填充的值
ID Day Percent
A 1 20
B 1 0
C 2 10
D 4 40
由于 table 都很大,我不想旋转任何 table。
我用scala写过代码。您可以参考 python.
scala> TableA.show()
+---+---+
| ID|Day|
+---+---+
| A| 1|
| B| 1|
| C| 2|
| D| 4|
+---+---+
scala> lookup.show()
+---+---+---+---+---+
| ID| 1| 2| 3| 4|
+---+---+---+---+---+
| A| 20| 10| 50| 30|
| B| 0| 50| 0| 50|
| C| 50| 10| 10| 30|
| D| 10| 25| 25| 40|
+---+---+---+---+---+
//UDF Functon to retrieve data from lookup table
val lookupUDF = (r:Row, s:String) => {
r.getAs(s).toString}
//Join over Key column "ID"
val joindf = TableA.join(lookup,"ID")
//final output DataFrame creation
val final_df = joindf.map(x => (x.getAs("ID").toString, x.getAs("Day").toString, lookupUDF(x,x.getAs("Day")))).toDF("ID","Day","Percentage")
final_df.show()
+---+---+----------+
| ID|Day|Percentage|
+---+---+----------+
| A| 1| 20|
| B| 1| 0|
| C| 2| 10|
| D| 4| 40|
+---+---+----------+
(在我发布问题后的第二天发布我的答案)
我能够通过将表转换为 pandas 数据框来解决这个问题。
from pyspark.sql.types import *
schema = StructType([StructField("id", StringType())\
,StructField("day", StringType())\
,StructField("1", IntegerType())\
,StructField("2", IntegerType())\
,StructField("3", IntegerType())\
,StructField("4", IntegerType())])
# Day field is String type
data = [['A', 1, 20, 10, 50, 30], ['B', 1, 0, 50, 0, 50], ['C', 2, 50, 10, 10, 30], ['D', 4, 10, 25, 25, 40]]
df = spark.createDataFrame(data,schema=schema)
df.show()
# After joining the 2 tables on "id", the tables would look like this:
+---+---+---+---+---+---+
| id|day| 1| 2| 3| 4|
+---+---+---+---+---+---+
| A| 1| 20| 10| 50| 30|
| B| 1| 0| 50| 0| 50|
| C| 2| 50| 10| 10| 30|
| D| 4| 10| 25| 25| 40|
+---+---+---+---+---+---+
# Converting to a pandas dataframe
pandas_df = df.toPandas()
id day 1 2 3 4
A 1 20 10 50 30
B 1 0 50 0 50
C 2 50 10 10 30
D 4 10 25 25 40
# UDF:
def udf(x):
return x[x['day']]
pandas_df['percent'] = pandas_df.apply(udf, axis=1)
# Converting back to a Spark DF:
spark_df = sqlContext.createDataFrame(pandas_df)
+---+---+---+---+---+---+---+
| id|day| 1| 2| 3| 4|new|
+---+---+---+---+---+---+---+
| A| 1| 20| 10| 50| 30| 20|
| B| 1| 0| 50| 0| 50| 0|
| C| 2| 50| 10| 10| 30| 10|
| D| 4| 10| 25| 25| 40| 40|
+---+---+---+---+---+---+---+
spark_df.select("id", "day", "percent").show()
+---+---+-------+
| id|day|percent|
+---+---+-------+
| A| 1| 20|
| B| 1| 0|
| C| 2| 10|
| D| 4| 40|
+---+---+-------+
如果有人在没有 pandas-df 转换的情况下在 PySpark 中发布答案,我将不胜感激。
df = spark.createDataFrame([{'ID':'A','Day':1}
,{'ID':'B','Day':1}
,{'ID':'C','Day':2}
,{'ID':'D','Day':4}])
df1 = spark.createDataFrame([{'ID':'A','1':20,'2':10,'3':50,'4':30},
{'ID':'B','1':0,'2':50,'3':0,'4':50},
{'ID':'C','1':50,'2':10,'3':10,'4':30},
{'ID':'D','1':10,'2':25,'3':25,'4':40}
])
df1=df1.withColumn('1',col('1').cast('int')).withColumn('2',col('2').cast('int')).withColumn('3',col('3').cast('int')).withColumn('4',col('4').cast('int'))
df=df.withColumn('Day',col('Day').cast('int'))
df_final = df.join(df1,'ID')
df_final_rdd = df_final.rdd
print(df_final_rdd.collect())
def create_list(r,s):
s=str(s)
k = (r['ID'],r['Day'],r[s])
return k
l=[]
for element in df_final_rdd.collect():
l.append(create_list(element,element['Day']))
rdd = sc.parallelize(l)
df= spark.createDataFrame(rdd).toDF('ID','Day','Percent')
我有 2 个 table:Table 'A' 和 Table 'Lookup'
Table答:
ID Day
A 1
B 1
C 2
D 4
查找 table 具有每个 ID-Day 组合的百分比值。
Table 查找:
ID 1 2 3 4
A 20 10 50 30
B 0 50 0 50
C 50 10 10 30
D 10 25 25 40
我的预期输出是在 Table 'A' 中有一个名为 'Percent' 的附加字段,其中包含从查找 table:
中填充的值ID Day Percent
A 1 20
B 1 0
C 2 10
D 4 40
由于 table 都很大,我不想旋转任何 table。
我用scala写过代码。您可以参考 python.
scala> TableA.show()
+---+---+
| ID|Day|
+---+---+
| A| 1|
| B| 1|
| C| 2|
| D| 4|
+---+---+
scala> lookup.show()
+---+---+---+---+---+
| ID| 1| 2| 3| 4|
+---+---+---+---+---+
| A| 20| 10| 50| 30|
| B| 0| 50| 0| 50|
| C| 50| 10| 10| 30|
| D| 10| 25| 25| 40|
+---+---+---+---+---+
//UDF Functon to retrieve data from lookup table
val lookupUDF = (r:Row, s:String) => {
r.getAs(s).toString}
//Join over Key column "ID"
val joindf = TableA.join(lookup,"ID")
//final output DataFrame creation
val final_df = joindf.map(x => (x.getAs("ID").toString, x.getAs("Day").toString, lookupUDF(x,x.getAs("Day")))).toDF("ID","Day","Percentage")
final_df.show()
+---+---+----------+
| ID|Day|Percentage|
+---+---+----------+
| A| 1| 20|
| B| 1| 0|
| C| 2| 10|
| D| 4| 40|
+---+---+----------+
(在我发布问题后的第二天发布我的答案)
我能够通过将表转换为 pandas 数据框来解决这个问题。
from pyspark.sql.types import *
schema = StructType([StructField("id", StringType())\
,StructField("day", StringType())\
,StructField("1", IntegerType())\
,StructField("2", IntegerType())\
,StructField("3", IntegerType())\
,StructField("4", IntegerType())])
# Day field is String type
data = [['A', 1, 20, 10, 50, 30], ['B', 1, 0, 50, 0, 50], ['C', 2, 50, 10, 10, 30], ['D', 4, 10, 25, 25, 40]]
df = spark.createDataFrame(data,schema=schema)
df.show()
# After joining the 2 tables on "id", the tables would look like this:
+---+---+---+---+---+---+
| id|day| 1| 2| 3| 4|
+---+---+---+---+---+---+
| A| 1| 20| 10| 50| 30|
| B| 1| 0| 50| 0| 50|
| C| 2| 50| 10| 10| 30|
| D| 4| 10| 25| 25| 40|
+---+---+---+---+---+---+
# Converting to a pandas dataframe
pandas_df = df.toPandas()
id day 1 2 3 4
A 1 20 10 50 30
B 1 0 50 0 50
C 2 50 10 10 30
D 4 10 25 25 40
# UDF:
def udf(x):
return x[x['day']]
pandas_df['percent'] = pandas_df.apply(udf, axis=1)
# Converting back to a Spark DF:
spark_df = sqlContext.createDataFrame(pandas_df)
+---+---+---+---+---+---+---+
| id|day| 1| 2| 3| 4|new|
+---+---+---+---+---+---+---+
| A| 1| 20| 10| 50| 30| 20|
| B| 1| 0| 50| 0| 50| 0|
| C| 2| 50| 10| 10| 30| 10|
| D| 4| 10| 25| 25| 40| 40|
+---+---+---+---+---+---+---+
spark_df.select("id", "day", "percent").show()
+---+---+-------+
| id|day|percent|
+---+---+-------+
| A| 1| 20|
| B| 1| 0|
| C| 2| 10|
| D| 4| 40|
+---+---+-------+
如果有人在没有 pandas-df 转换的情况下在 PySpark 中发布答案,我将不胜感激。
df = spark.createDataFrame([{'ID':'A','Day':1}
,{'ID':'B','Day':1}
,{'ID':'C','Day':2}
,{'ID':'D','Day':4}])
df1 = spark.createDataFrame([{'ID':'A','1':20,'2':10,'3':50,'4':30},
{'ID':'B','1':0,'2':50,'3':0,'4':50},
{'ID':'C','1':50,'2':10,'3':10,'4':30},
{'ID':'D','1':10,'2':25,'3':25,'4':40}
])
df1=df1.withColumn('1',col('1').cast('int')).withColumn('2',col('2').cast('int')).withColumn('3',col('3').cast('int')).withColumn('4',col('4').cast('int'))
df=df.withColumn('Day',col('Day').cast('int'))
df_final = df.join(df1,'ID')
df_final_rdd = df_final.rdd
print(df_final_rdd.collect())
def create_list(r,s):
s=str(s)
k = (r['ID'],r['Day'],r[s])
return k
l=[]
for element in df_final_rdd.collect():
l.append(create_list(element,element['Day']))
rdd = sc.parallelize(l)
df= spark.createDataFrame(rdd).toDF('ID','Day','Percent')