PySpark Python 使用列对数据框进行排序
PySpark Python Sorting dataframe using a column
所以我有 2 个问题,我认为这对于有 PySpark 经验的人来说应该是基本的,但我似乎无法解决它们。
我的 csv
文件中的样本条目是-
"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aw0L1uT_OfFyzbk","1"
"D9TOXY7rA_LsnvwQa-awVk","2"
"JWg8_0lGDA7OCwWcH_9aDc","2"
"ewrq.AAbRaACr2tVh5wA","1"
"ewrq.AALJWAAC-Qku3heg","1"
"ewrq.AADStQqmhJ7A","2"
"ewrq.AAEAABh36oHUNA","1"
"ewrq.AALJABfV5u-7Yg","1"
我创建了以下数据框-
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows
首先,这是将 hits
列转换为 IntegerType()
的正确方法吗?为什么所有值都变成 null
?
>>> df2 = df2.withColumn("hits", df2["hits"].cast(IntegerType()))
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...|null|
|"yDQ...|null|
|"qUU...|null|
+-------+----+
only showing top 3 rows
其次,我需要按照 hits
列的降序对列表进行排序。所以,我尝试了这个-
>>> df1 = df2.sort(col('hits').desc())
>>> df1.show(20)
但是我得到以下错误-
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.
我猜这是因为我使用 -
创建数据框
>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()
>>> df2 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows
而且我猜某些行中有多余的逗号。我该如何避免这种情况 - 或者阅读此文件的最佳方式是什么?
更新
-- 添加文件读取和拆分
查看上面的示例,创建了一个这样的文件
'"7wAfdgdfgd","7"'
'"1x3Qdfgdf","1"'
'"13xxyyzzsdff","13"'
--请注意 '
使所有行成为单个字符串
现在阅读它的代码:
scala> val myRdd = sc.textFile("test_file.dat")
myRdd: org.apache.spark.rdd.RDD[String] = test_file.dat MapPartitionsRDD[1] at textFile at <console>:24
// please check the type of RDD , here it is string
// We need to have Iterable[tuple(String,String)] to convert it into Dataframe
scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1)))
res0: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26
// Finally
scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1))).toDF("user_id","hits").show(false)
+--------------+----+
|user_id |hits|
+--------------+----+
|"7wAfdgdfgd" |"7" |
|"1x3Qdfgdf" |"1" |
|"13xxyyzzsdff"|"13"|
+--------------+----+
更新结束
因为你是新手(或其他),我 recommend/practice 运行 实际 ANSI sql
而不是 pyspark.sql.functions
。它易于维护 + 使用 sql.functions
比 ansi sql
没有优势。
显然,你需要知道spark提供的sql/columns函数,因为我在回答中使用了split
、orderBy
和cast
。
由于您没有提供 text file
的内容,这里是我的看法,所有 3 个答案 合而为一 SQL
myDf = spark.createDataFrame([("abc","7"),("xyz","18"),("lmn","4,xyz")],schema=["user_id","hits"])
myDf.show(20,False)
+-------+-----+
|user_id|hits |
+-------+-----+
|abc |7 |
|xyz |18 |
|lmn |4,xyz|
+-------+-----+
myDf.createOrReplaceTempView("hits_table")
SQL + 结果
spark.sql("select user_id, cast(split(hits,',')[0] as integer) as hits from hits_table order by hits desc ").show(20,False)
+-------+----+
|user_id|hits|
+-------+----+
|xyz |18 |
|abc |7 |
|lmn |4 |
+-------+----+
所以,w.r.t @SanBan 的回答,我得出了以下结果-
>>> rdd = sc.textFile("/home/jsanghvi/work/buffer/*")
>>> schema = StructType([StructField ("user_id", StringType(), True), StructField ("hits", StringType(), True)])
>>> my_rdd = rdd.map(lambda x: x.replace("'","")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))
>>> my_rdd2 = my_rdd.map(lambda x: str(x).replace("'","").replace("(", "").replace(")", "")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))
>>> df1 = spark.createDataFrame(my_rdd2, schema)
>>> dfx = df1.sort(col('hits').desc())
>>> dfx.show(5)
+----------------+--------------------+
| user_id| hits|
+----------------+--------------------+
|"AUDIO_AUTO_PLAY| EXPANDABLE_AUTOM...|
| "user_id"| "_col1"|
| "AAESjk66lDk...| "9999"|
| "ABexsk6sLlc...| "9999"|
| "AAgb1k65pHI...| "9999"|
+----------------+--------------------+
# removing garbage rows
>>> dfx = df2.filter(~col("hits").isin(["_col1", "EXPANDABLE_AUTOM..."]))
所以我有 2 个问题,我认为这对于有 PySpark 经验的人来说应该是基本的,但我似乎无法解决它们。
我的 csv
文件中的样本条目是-
"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aw0L1uT_OfFyzbk","1"
"D9TOXY7rA_LsnvwQa-awVk","2"
"JWg8_0lGDA7OCwWcH_9aDc","2"
"ewrq.AAbRaACr2tVh5wA","1"
"ewrq.AALJWAAC-Qku3heg","1"
"ewrq.AADStQqmhJ7A","2"
"ewrq.AAEAABh36oHUNA","1"
"ewrq.AALJABfV5u-7Yg","1"
我创建了以下数据框-
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows
首先,这是将 hits
列转换为 IntegerType()
的正确方法吗?为什么所有值都变成 null
?
>>> df2 = df2.withColumn("hits", df2["hits"].cast(IntegerType()))
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...|null|
|"yDQ...|null|
|"qUU...|null|
+-------+----+
only showing top 3 rows
其次,我需要按照 hits
列的降序对列表进行排序。所以,我尝试了这个-
>>> df1 = df2.sort(col('hits').desc())
>>> df1.show(20)
但是我得到以下错误-
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.
我猜这是因为我使用 -
创建数据框>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()
>>> df2 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows
而且我猜某些行中有多余的逗号。我该如何避免这种情况 - 或者阅读此文件的最佳方式是什么?
更新
-- 添加文件读取和拆分
查看上面的示例,创建了一个这样的文件
'"7wAfdgdfgd","7"'
'"1x3Qdfgdf","1"'
'"13xxyyzzsdff","13"'
--请注意 '
使所有行成为单个字符串
现在阅读它的代码:
scala> val myRdd = sc.textFile("test_file.dat")
myRdd: org.apache.spark.rdd.RDD[String] = test_file.dat MapPartitionsRDD[1] at textFile at <console>:24
// please check the type of RDD , here it is string
// We need to have Iterable[tuple(String,String)] to convert it into Dataframe
scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1)))
res0: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26
// Finally
scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1))).toDF("user_id","hits").show(false)
+--------------+----+
|user_id |hits|
+--------------+----+
|"7wAfdgdfgd" |"7" |
|"1x3Qdfgdf" |"1" |
|"13xxyyzzsdff"|"13"|
+--------------+----+
更新结束
因为你是新手(或其他),我 recommend/practice 运行 实际 ANSI sql
而不是 pyspark.sql.functions
。它易于维护 + 使用 sql.functions
比 ansi sql
没有优势。
显然,你需要知道spark提供的sql/columns函数,因为我在回答中使用了split
、orderBy
和cast
。
由于您没有提供 text file
的内容,这里是我的看法,所有 3 个答案 合而为一 SQL
myDf = spark.createDataFrame([("abc","7"),("xyz","18"),("lmn","4,xyz")],schema=["user_id","hits"])
myDf.show(20,False)
+-------+-----+
|user_id|hits |
+-------+-----+
|abc |7 |
|xyz |18 |
|lmn |4,xyz|
+-------+-----+
myDf.createOrReplaceTempView("hits_table")
SQL + 结果
spark.sql("select user_id, cast(split(hits,',')[0] as integer) as hits from hits_table order by hits desc ").show(20,False)
+-------+----+
|user_id|hits|
+-------+----+
|xyz |18 |
|abc |7 |
|lmn |4 |
+-------+----+
所以,w.r.t @SanBan 的回答,我得出了以下结果-
>>> rdd = sc.textFile("/home/jsanghvi/work/buffer/*")
>>> schema = StructType([StructField ("user_id", StringType(), True), StructField ("hits", StringType(), True)])
>>> my_rdd = rdd.map(lambda x: x.replace("'","")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))
>>> my_rdd2 = my_rdd.map(lambda x: str(x).replace("'","").replace("(", "").replace(")", "")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))
>>> df1 = spark.createDataFrame(my_rdd2, schema)
>>> dfx = df1.sort(col('hits').desc())
>>> dfx.show(5)
+----------------+--------------------+
| user_id| hits|
+----------------+--------------------+
|"AUDIO_AUTO_PLAY| EXPANDABLE_AUTOM...|
| "user_id"| "_col1"|
| "AAESjk66lDk...| "9999"|
| "ABexsk6sLlc...| "9999"|
| "AAgb1k65pHI...| "9999"|
+----------------+--------------------+
# removing garbage rows
>>> dfx = df2.filter(~col("hits").isin(["_col1", "EXPANDABLE_AUTOM..."]))