如何 select 最后一行以及如何按索引访问 PySpark 数据帧?
How to select last row and also how to access PySpark dataframe by index?
来自像
这样的 PySpark SQL 数据框
name age city
abc 20 A
def 30 B
如何获取最后一行。(就像 df.limit(1) 我可以将数据帧的第一行放入新数据帧)。
以及如何通过 index.like 行号访问数据框行。 12 或 200 .
在pandas我能做到
df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]
我只是好奇如何以这种方式或替代方式访问 pyspark 数据框。
谢谢
How to get the last row.
漫长而丑陋的方式,假设所有列都是可订购的:
from pyspark.sql.functions import (
col, max as max_, struct, monotonically_increasing_id
)
last_row = (df
.withColumn("_id", monotonically_increasing_id())
.select(max(struct("_id", *df.columns))
.alias("tmp")).select(col("tmp.*"))
.drop("_id"))
如果不是所有的列都可以排序你可以试试:
with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]
with_id.where(col("_id") == i).drop("_id")
注意。 pyspark.sql.functions
/`o.a.s.sql.functions中有last
的功能,但考虑到description of the corresponding expressions这里不是一个好的选择。
how can I access the dataframe rows by index.like
你不能。 Spark DataFrame
并可通过索引访问。 稍后过滤。请记住这个 O(N) 操作。
使用下面得到一个包含单调递增、唯一、和个连续整数的索引列,即不是 monotonically_increasing_id()
是如何工作的。索引将按照与 DataFrame 的 colName
相同的顺序升序。
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('index', F.sum('int').over(window))\
.drop('int')\
使用以下代码查看 DataFrame 的尾部或最后 rownums
。
rownums = 10
df.where(F.col('index')>df.count()-rownums).show()
使用以下代码查看 DataFrame 从 start_row
到 end_row
的行。
start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()
zipWithIndex()
是一种 RDD 方法,它执行 return 单调递增、唯一且连续的整数,但以一种可以返回到原始 DataFrame 的方式实现起来似乎要慢得多用 id 列修改。
How to get the last row.
如果您有一列可用于排序数据框,例如 "index",那么获取最后一条记录的一种简单方法是使用 SQL:
1) 按降序排列 table 和
2) 从此订单中获取第一个值
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
And how can I access the dataframe rows by index.like row no. 12 or 200 .
类似的方法你可以在任何行中获取记录
row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()
如果您没有 "index" 列,您可以使用
创建它
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("index", monotonically_increasing_id())
from pyspark.sql import functions as F
expr = [F.last(col).alias(col) for col in df.columns]
df.agg(*expr)
小提示: 看起来你仍然有使用 pandas 或 R 的人的心态。Spark 是我们工作方式的不同范例与数据。您不再访问单个单元格内的数据,现在您可以处理其中的整个块。如果你像刚才那样继续收集东西和做动作,你就会失去 spark 提供的并行性的整个概念。查看 Spark 中转换与操作的概念。
来自像
这样的 PySpark SQL 数据框name age city
abc 20 A
def 30 B
如何获取最后一行。(就像 df.limit(1) 我可以将数据帧的第一行放入新数据帧)。
以及如何通过 index.like 行号访问数据框行。 12 或 200 .
在pandas我能做到
df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]
我只是好奇如何以这种方式或替代方式访问 pyspark 数据框。
谢谢
How to get the last row.
漫长而丑陋的方式,假设所有列都是可订购的:
from pyspark.sql.functions import (
col, max as max_, struct, monotonically_increasing_id
)
last_row = (df
.withColumn("_id", monotonically_increasing_id())
.select(max(struct("_id", *df.columns))
.alias("tmp")).select(col("tmp.*"))
.drop("_id"))
如果不是所有的列都可以排序你可以试试:
with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]
with_id.where(col("_id") == i).drop("_id")
注意。 pyspark.sql.functions
/`o.a.s.sql.functions中有last
的功能,但考虑到description of the corresponding expressions这里不是一个好的选择。
how can I access the dataframe rows by index.like
你不能。 Spark DataFrame
并可通过索引访问。
使用下面得到一个包含单调递增、唯一、和个连续整数的索引列,即不是 monotonically_increasing_id()
是如何工作的。索引将按照与 DataFrame 的 colName
相同的顺序升序。
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('index', F.sum('int').over(window))\
.drop('int')\
使用以下代码查看 DataFrame 的尾部或最后 rownums
。
rownums = 10
df.where(F.col('index')>df.count()-rownums).show()
使用以下代码查看 DataFrame 从 start_row
到 end_row
的行。
start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()
zipWithIndex()
是一种 RDD 方法,它执行 return 单调递增、唯一且连续的整数,但以一种可以返回到原始 DataFrame 的方式实现起来似乎要慢得多用 id 列修改。
How to get the last row.
如果您有一列可用于排序数据框,例如 "index",那么获取最后一条记录的一种简单方法是使用 SQL: 1) 按降序排列 table 和 2) 从此订单中获取第一个值
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
And how can I access the dataframe rows by index.like row no. 12 or 200 .
类似的方法你可以在任何行中获取记录
row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()
如果您没有 "index" 列,您可以使用
创建它from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("index", monotonically_increasing_id())
from pyspark.sql import functions as F
expr = [F.last(col).alias(col) for col in df.columns]
df.agg(*expr)
小提示: 看起来你仍然有使用 pandas 或 R 的人的心态。Spark 是我们工作方式的不同范例与数据。您不再访问单个单元格内的数据,现在您可以处理其中的整个块。如果你像刚才那样继续收集东西和做动作,你就会失去 spark 提供的并行性的整个概念。查看 Spark 中转换与操作的概念。