Pyspark 根据列值复制行
Pyspark Replicate Row based on column value
我想根据每行给定列的值复制我的 DataFrame 中的所有行,然后为每个新行编制索引。假设我有:
Column A Column B
T1 3
T2 2
我想要的结果是:
Column A Column B Index
T1 3 1
T1 3 2
T1 3 3
T2 2 1
T2 2 2
我能够使用固定值进行类似的操作,但不能使用在该列中找到的信息。我当前的固定值工作代码是:
idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))
我尝试更改:
lit(i) for i in range(1, 10)
到
lit(i) for i in range(1, df['Column B'])
并将其添加到我的 array() 函数中:
df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))
但它不起作用(类型错误:'Column' 对象不能被解释为整数)。
我该如何实施?
很遗憾,您不能 那样。您始终可以使用 udf
,但我确实有一个非 udf hack 解决方案,如果您使用的是 Spark 2.1 或更高版本,它应该适合您。
诀窍是利用pyspark.sql.functions.posexplode()
获取索引值。我们通过重复一个逗号 Column B
次来创建一个字符串来做到这一点。然后我们在逗号上分割这个字符串,并使用 posexplode
来获取索引。
df.createOrReplaceTempView("df") # first register the DataFrame as a temp table
query = 'SELECT '\
'`Column A`,'\
'`Column B`,'\
'pos AS Index '\
'FROM ( '\
'SELECT DISTINCT '\
'`Column A`,'\
'`Column B`,'\
'posexplode(split(repeat(",", `Column B`), ",")) '\
'FROM df) AS a '\
'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#| T1| 3| 1|
#| T1| 3| 2|
#| T1| 3| 3|
#| T2| 2| 1|
#| T2| 2| 2|
#+--------+--------+-----+
注意:您需要将列名用反引号括起来,因为它们中有空格,如 post 中所述:
You can try this:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import functions as F
df = spark.read.csv('/FileStore/tables/stack1.csv', header = 'True', inferSchema = 'True')
w = Window.orderBy("Column A")
df = df.select(row_number().over(w).alias("Index"), col("*"))
n_to_array = udf(lambda n : [n] * n ,ArrayType(IntegerType()))
df2 = df.withColumn('Column B', n_to_array('Column B'))
df3= df2.withColumn('Column B', explode('Column B'))
df3.show()
我想根据每行给定列的值复制我的 DataFrame 中的所有行,然后为每个新行编制索引。假设我有:
Column A Column B
T1 3
T2 2
我想要的结果是:
Column A Column B Index
T1 3 1
T1 3 2
T1 3 3
T2 2 1
T2 2 2
我能够使用固定值进行类似的操作,但不能使用在该列中找到的信息。我当前的固定值工作代码是:
idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))
我尝试更改:
lit(i) for i in range(1, 10)
到
lit(i) for i in range(1, df['Column B'])
并将其添加到我的 array() 函数中:
df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))
但它不起作用(类型错误:'Column' 对象不能被解释为整数)。
我该如何实施?
很遗憾,您不能 udf
,但我确实有一个非 udf hack 解决方案,如果您使用的是 Spark 2.1 或更高版本,它应该适合您。
诀窍是利用pyspark.sql.functions.posexplode()
获取索引值。我们通过重复一个逗号 Column B
次来创建一个字符串来做到这一点。然后我们在逗号上分割这个字符串,并使用 posexplode
来获取索引。
df.createOrReplaceTempView("df") # first register the DataFrame as a temp table
query = 'SELECT '\
'`Column A`,'\
'`Column B`,'\
'pos AS Index '\
'FROM ( '\
'SELECT DISTINCT '\
'`Column A`,'\
'`Column B`,'\
'posexplode(split(repeat(",", `Column B`), ",")) '\
'FROM df) AS a '\
'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#| T1| 3| 1|
#| T1| 3| 2|
#| T1| 3| 3|
#| T2| 2| 1|
#| T2| 2| 2|
#+--------+--------+-----+
注意:您需要将列名用反引号括起来,因为它们中有空格,如 post 中所述:
You can try this:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import functions as F
df = spark.read.csv('/FileStore/tables/stack1.csv', header = 'True', inferSchema = 'True')
w = Window.orderBy("Column A")
df = df.select(row_number().over(w).alias("Index"), col("*"))
n_to_array = udf(lambda n : [n] * n ,ArrayType(IntegerType()))
df2 = df.withColumn('Column B', n_to_array('Column B'))
df3= df2.withColumn('Column B', explode('Column B'))
df3.show()