从 spark 数据框中获取特定行
get specific row from spark dataframe
scala spark 数据帧中是否有 df[100, c("column")]
的替代方案。我想 select 火花数据框列中的特定行。
例如 100th
上面 R 等效代码中的行
首先,你必须了解DataFrames
是分布式的,这意味着你不能以典型的过程方式访问它们,你必须先进行分析。虽然,您问的是 Scala
,但我建议您阅读 Pyspark Documentation,因为它的示例比任何其他文档都多。
但是,继续我的解释,我会使用 RDD
API 的一些方法,因为所有 DataFrame
都有一个 RDD
作为属性。请看下面我的示例,并注意我是如何记录第二条记录的。
df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
myIndex = 1
values = (df.rdd.zipWithIndex()
.filter(lambda ((l, v), i): i == myIndex)
.map(lambda ((l,v), i): (l, v))
.collect())
print(values[0])
# (u'b', 2)
希望有人能用更少的步骤给出另一个解决方案。
这就是我在 Scala 中实现相同目标的方式。我不确定它是否比有效答案更有效,但它需要更少的编码
val parquetFileDF = sqlContext.read.parquet("myParquetFule.parquet")
val myRow7th = parquetFileDF.rdd.take(7).last
下面的 getrows()
函数应该可以获取您想要的特定行。
为了完整起见,我写下了完整的代码以重现输出。
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('scratch').getOrCreate()
# Create the dataframe
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
# Function to get rows at `rownums`
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()
# Output:
#> [(Row(letter='a', name=1)), (Row(letter='c', name=3))]
有一个scala方法(如果你有足够的工作机器内存):
val arr = df.select("column").rdd.collect
println(arr(100))
如果数据框架构未知,并且您知道 "column"
字段的实际类型(例如双精度),那么您可以获得 arr
如下:
val arr = df.select($"column".cast("Double")).as[Double].rdd.collect
在 PySpark 中,如果您的数据集很小(可以装入驱动程序的内存),您可以这样做
df.collect()[n]
其中 df
是 DataFrame 对象,n
是感兴趣的行。获取所述行后,您可以执行 row.myColumn
或 row["myColumn"]
来获取内容,如 API docs.
中所述
您可以使用下面的一行代码简单地做到这一点
val arr = df.select("column").collect()(99)
以下是 Java-Spark 的实现方式,1) 添加顺序递增的列。 2) Select行号使用Id。 3) 删除专栏
import static org.apache.spark.sql.functions.*;
..
ds = ds.withColumn("rownum", functions.monotonically_increasing_id());
ds = ds.filter(col("rownum").equalTo(99));
ds = ds.drop("rownum");
N.B。 monotonically_increasing_id从0开始;
当你想从dataframe中获取日期列的最大值时,只获取没有对象类型或Row对象信息的值,你可以参考下面的代码。
table = "我的table"
max_date = df.select(最大('date_col')).first()[0]
2020-06-26
instead of Row(max(reference_week)=datetime.date(2020, 6, 26))
这在 PySpark 中对我有用
df.select("column").collect()[0][0]
scala spark 数据帧中是否有 df[100, c("column")]
的替代方案。我想 select 火花数据框列中的特定行。
例如 100th
上面 R 等效代码中的行
首先,你必须了解DataFrames
是分布式的,这意味着你不能以典型的过程方式访问它们,你必须先进行分析。虽然,您问的是 Scala
,但我建议您阅读 Pyspark Documentation,因为它的示例比任何其他文档都多。
但是,继续我的解释,我会使用 RDD
API 的一些方法,因为所有 DataFrame
都有一个 RDD
作为属性。请看下面我的示例,并注意我是如何记录第二条记录的。
df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
myIndex = 1
values = (df.rdd.zipWithIndex()
.filter(lambda ((l, v), i): i == myIndex)
.map(lambda ((l,v), i): (l, v))
.collect())
print(values[0])
# (u'b', 2)
希望有人能用更少的步骤给出另一个解决方案。
这就是我在 Scala 中实现相同目标的方式。我不确定它是否比有效答案更有效,但它需要更少的编码
val parquetFileDF = sqlContext.read.parquet("myParquetFule.parquet")
val myRow7th = parquetFileDF.rdd.take(7).last
下面的 getrows()
函数应该可以获取您想要的特定行。
为了完整起见,我写下了完整的代码以重现输出。
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('scratch').getOrCreate()
# Create the dataframe
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
# Function to get rows at `rownums`
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()
# Output:
#> [(Row(letter='a', name=1)), (Row(letter='c', name=3))]
有一个scala方法(如果你有足够的工作机器内存):
val arr = df.select("column").rdd.collect
println(arr(100))
如果数据框架构未知,并且您知道 "column"
字段的实际类型(例如双精度),那么您可以获得 arr
如下:
val arr = df.select($"column".cast("Double")).as[Double].rdd.collect
在 PySpark 中,如果您的数据集很小(可以装入驱动程序的内存),您可以这样做
df.collect()[n]
其中 df
是 DataFrame 对象,n
是感兴趣的行。获取所述行后,您可以执行 row.myColumn
或 row["myColumn"]
来获取内容,如 API docs.
您可以使用下面的一行代码简单地做到这一点
val arr = df.select("column").collect()(99)
以下是 Java-Spark 的实现方式,1) 添加顺序递增的列。 2) Select行号使用Id。 3) 删除专栏
import static org.apache.spark.sql.functions.*;
..
ds = ds.withColumn("rownum", functions.monotonically_increasing_id());
ds = ds.filter(col("rownum").equalTo(99));
ds = ds.drop("rownum");
N.B。 monotonically_increasing_id从0开始;
当你想从dataframe中获取日期列的最大值时,只获取没有对象类型或Row对象信息的值,你可以参考下面的代码。
table = "我的table"
max_date = df.select(最大('date_col')).first()[0]
2020-06-26
instead of Row(max(reference_week)=datetime.date(2020, 6, 26))
这在 PySpark 中对我有用
df.select("column").collect()[0][0]