在 Spark 数据框列中获取最大值的最佳方法
Best way to get the max value in a Spark dataframe column
我正在尝试找出在 Spark 数据帧列中获取最大值的最佳方法。
考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
创建:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
我的目标是找到 A 列中的最大值(经检查,这是 3.0)。使用 PySpark,这里有四种我能想到的方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
以上每个都给出了正确的答案,但在没有 Spark 分析工具的情况下,我无法判断哪个是最好的。
关于上述哪种方法在 Spark 运行时或资源使用方面最有效,或者是否有比上述方法更直接的方法,从直觉或经验主义的任何想法?
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
答案和方法三差不多。但似乎可以删除 method3 中的 "asDict()"
如果有人想知道如何使用 Scala(使用 Spark 2.0.+)做到这一点,请看这里:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
数据框特定列的最大值可以通过使用 -
来实现
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
备注:Spark 旨在处理大数据 - 分布式计算。示例DataFrame的大小非常小,因此实际示例的顺序可以相对于小示例进行更改。
最慢:Method_1,因为 .describe("A")
计算最小值、最大值、平均值、stddev 和计数(对整列进行 5 次计算)。
中:Method_4,因为,.rdd
(DF 到 RDD 转换)减慢了过程。
更快:Method_3 ~ Method_2 ~ Method_5,因为逻辑非常相似,所以Spark的催化剂优化器遵循非常相似的逻辑,操作次数最少(get max of a特定列,收集单值数据框;.asDict()
比较 2、3 与 5)
增加了一点额外时间
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 5: Use agg()
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
集群边缘节点上的结果,以毫秒 (ms) 为单位:
小DF(毫秒):{'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
更大的 DF(毫秒):{'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
我认为最好的解决方案是使用 head()
考虑你的例子:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
使用 python 的 agg 和 max 方法,我们可以得到如下值:
from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
这将 return:
3.0
确保导入正确:
from pyspark.sql.functions import max
我们这里使用的max函数是pySParksql库函数,不是python.
默认的max函数
另一种方法:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
根据我的数据,我得到了这个基准:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
他们都给出了相同的答案
这是一种懒惰的方法,只需进行计算统计:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
或
spark.sql("Select * from sampleStats").describe('ColName')
或者您可以打开一个配置单元 shell 和
describe formatted table sampleStats;
您将在属性中看到统计信息 - 最小值、最大值、不同值、空值等。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
结果是(3.0,6.0),和testDataFrame.agg(max($"A"), max($"B")).collect()(0)
一样。但是,testDataFrame.agg(max($"A"), max($"B")).collect()(0)
returns一个List,[3.0,6.0]
在 pyspark 中你可以这样做:
max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
下面的示例展示了如何获取 Spark 数据帧列中的最大值。
from pyspark.sql.functions import max
df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
| 3.0|
+------+
print result.collect()[0]['max(A)']
3.0
同样min,mean等可以计算如下:
from pyspark.sql.functions import mean, min, max
result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
| 2.0| 1.0| 3.0|
+------+------+------+
首先添加导入行:
from pyspark.sql.functions import min, max
在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
在数据框中找到年龄的最大值:
df.agg(max("age")).show()
+--------+
|max(age)|
+--------+
| 77|
+--------+
要获取值,请使用其中任何一个
df1.agg({"x": "max"}).collect()[0][0]
df1.agg({"x": "max"}).head()[0]
df1.agg({"x": "max"}).first()[0]
或者我们可以为 'min'
做这些
from pyspark.sql.functions import min, max
df1.agg(min("id")).collect()[0][0]
df1.agg(min("id")).head()[0]
df1.agg(min("id")).first()[0]
我使用了这个链中已经存在的另一个解决方案(@satprem rath)。
要在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
编辑:添加更多上下文。
虽然上述方法打印了结果,但我在将结果分配给变量以便稍后重用时遇到了问题。
因此,要仅获取分配给变量的 int
值:
from pyspark.sql.functions import max, min
maxValueA = df.agg(max("A")).collect()[0][0]
maxValueB = df.agg(max("B")).collect()[0][0]
我正在尝试找出在 Spark 数据帧列中获取最大值的最佳方法。
考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
创建:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
我的目标是找到 A 列中的最大值(经检查,这是 3.0)。使用 PySpark,这里有四种我能想到的方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
以上每个都给出了正确的答案,但在没有 Spark 分析工具的情况下,我无法判断哪个是最好的。
关于上述哪种方法在 Spark 运行时或资源使用方面最有效,或者是否有比上述方法更直接的方法,从直觉或经验主义的任何想法?
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
答案和方法三差不多。但似乎可以删除 method3 中的 "asDict()"
如果有人想知道如何使用 Scala(使用 Spark 2.0.+)做到这一点,请看这里:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
数据框特定列的最大值可以通过使用 -
来实现your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
备注:Spark 旨在处理大数据 - 分布式计算。示例DataFrame的大小非常小,因此实际示例的顺序可以相对于小示例进行更改。
最慢:Method_1,因为 .describe("A")
计算最小值、最大值、平均值、stddev 和计数(对整列进行 5 次计算)。
中:Method_4,因为,.rdd
(DF 到 RDD 转换)减慢了过程。
更快:Method_3 ~ Method_2 ~ Method_5,因为逻辑非常相似,所以Spark的催化剂优化器遵循非常相似的逻辑,操作次数最少(get max of a特定列,收集单值数据框;.asDict()
比较 2、3 与 5)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 5: Use agg()
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
集群边缘节点上的结果,以毫秒 (ms) 为单位:
小DF(毫秒):{'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
更大的 DF(毫秒):{'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
我认为最好的解决方案是使用 head()
考虑你的例子:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
使用 python 的 agg 和 max 方法,我们可以得到如下值:
from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
这将 return:
3.0
确保导入正确:
from pyspark.sql.functions import max
我们这里使用的max函数是pySParksql库函数,不是python.
另一种方法:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
根据我的数据,我得到了这个基准:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
他们都给出了相同的答案
这是一种懒惰的方法,只需进行计算统计:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
或
spark.sql("Select * from sampleStats").describe('ColName')
或者您可以打开一个配置单元 shell 和
describe formatted table sampleStats;
您将在属性中看到统计信息 - 最小值、最大值、不同值、空值等。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
结果是(3.0,6.0),和testDataFrame.agg(max($"A"), max($"B")).collect()(0)
一样。但是,testDataFrame.agg(max($"A"), max($"B")).collect()(0)
returns一个List,[3.0,6.0]
在 pyspark 中你可以这样做:
max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
下面的示例展示了如何获取 Spark 数据帧列中的最大值。
from pyspark.sql.functions import max
df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
| 3.0|
+------+
print result.collect()[0]['max(A)']
3.0
同样min,mean等可以计算如下:
from pyspark.sql.functions import mean, min, max
result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
| 2.0| 1.0| 3.0|
+------+------+------+
首先添加导入行:
from pyspark.sql.functions import min, max
在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
在数据框中找到年龄的最大值:
df.agg(max("age")).show()
+--------+
|max(age)|
+--------+
| 77|
+--------+
要获取值,请使用其中任何一个
df1.agg({"x": "max"}).collect()[0][0]
df1.agg({"x": "max"}).head()[0]
df1.agg({"x": "max"}).first()[0]
或者我们可以为 'min'
做这些from pyspark.sql.functions import min, max
df1.agg(min("id")).collect()[0][0]
df1.agg(min("id")).head()[0]
df1.agg(min("id")).first()[0]
我使用了这个链中已经存在的另一个解决方案(@satprem rath)。
要在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
编辑:添加更多上下文。
虽然上述方法打印了结果,但我在将结果分配给变量以便稍后重用时遇到了问题。
因此,要仅获取分配给变量的 int
值:
from pyspark.sql.functions import max, min
maxValueA = df.agg(max("A")).collect()[0][0]
maxValueB = df.agg(max("B")).collect()[0][0]