在具有非恒定帧大小的 Spark 中应用 window 函数
Apply window function in Spark with non constant frame size
我的问题
我目前在使用 Spark window 函数时遇到困难。我正在使用 Spark(通过 pyspark)版本 1.6.3
(关联 Python 版本 2.6.6
)。我 运行 一个 pyspark shell 实例自动初始化 HiveContext
作为我的 sqlContext
.
我想用 window
函数进行滚动求和。我的问题是 window 框架不固定:这取决于我们考虑的观察。更具体地说,我通过一个名为 rank_id
的变量对数据进行排序,并希望对索引 $x+1$ 和 $2x-1$ 之间索引为 $x$ 的任何观察值进行滚动求和。因此,我的 rangeBetween
必须依赖于 rank_id
变量值。
重要的一点是我不想收集数据因此不能使用像numpy
这样的东西(我的数据有很多观察)。
可重现的例子
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
固定宽度累计和:没问题
使用 window
函数,我可以 运行 给定数量的索引的累计和(我在这里使用 rangeBetween
但对于这个例子 rowBetween
可以无差别地使用)。
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
累计宽度不固定
我想在索引 x+1 和 2x-1 之间求和,其中 x是我的行索引。当我尝试将它传递给 Spark 时(与我们为 orderBy
所做的类似方式可能就是问题所在),我收到以下错误
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File "", line 1, in
TypeError: cannot concatenate 'str' and 'int' objects
我尝试了其他方法,使用 SQL 语句
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
这一次给我以下错误
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
我还注意到,当我使用 SQL OVER
子句尝试更简单的语句时,我遇到了类似的错误,这可能意味着我没有将 SQL 语句正确传递给 Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File "", line 6, in
File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
如何在 Spark 中使用 window
或 SQL
语句来解决我的问题?
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR 您不能,或者至少不能以可扩展的方式满足当前要求。您可以尝试类似于在 RDD 上滑动的操作:
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
这是不正确的。范围规范需要 (PRECEDING
| FOLLOWING
| CURRENT_ROW
) 规范。也不应该有分号:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
如例外情况所述 - 您不能对字符串和整数调用 +
。您可能需要专栏:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
但这不受支持。范围必须是固定大小,不能根据表达式定义。
An important point is that I don't want to collect data
Window 没有 partitionBy
的定义:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
和收集一样糟糕。因此,即使 "dynamic frame"(有条件和无限制 window)问题有解决方法,他们也不会在这里帮助你。
我的问题
我目前在使用 Spark window 函数时遇到困难。我正在使用 Spark(通过 pyspark)版本 1.6.3
(关联 Python 版本 2.6.6
)。我 运行 一个 pyspark shell 实例自动初始化 HiveContext
作为我的 sqlContext
.
我想用 window
函数进行滚动求和。我的问题是 window 框架不固定:这取决于我们考虑的观察。更具体地说,我通过一个名为 rank_id
的变量对数据进行排序,并希望对索引 $x+1$ 和 $2x-1$ 之间索引为 $x$ 的任何观察值进行滚动求和。因此,我的 rangeBetween
必须依赖于 rank_id
变量值。
重要的一点是我不想收集数据因此不能使用像numpy
这样的东西(我的数据有很多观察)。
可重现的例子
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
固定宽度累计和:没问题
使用 window
函数,我可以 运行 给定数量的索引的累计和(我在这里使用 rangeBetween
但对于这个例子 rowBetween
可以无差别地使用)。
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
累计宽度不固定
我想在索引 x+1 和 2x-1 之间求和,其中 x是我的行索引。当我尝试将它传递给 Spark 时(与我们为 orderBy
所做的类似方式可能就是问题所在),我收到以下错误
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last): File "", line 1, in TypeError: cannot concatenate 'str' and 'int' objects
我尝试了其他方法,使用 SQL 语句
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
这一次给我以下错误
Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"
我还注意到,当我使用 SQL OVER
子句尝试更简单的语句时,我遇到了类似的错误,这可能意味着我没有将 SQL 语句正确传递给 Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"
如何在 Spark 中使用 window
或 SQL
语句来解决我的问题?
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR 您不能,或者至少不能以可扩展的方式满足当前要求。您可以尝试类似于在 RDD 上滑动的操作:
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
这是不正确的。范围规范需要 (PRECEDING
| FOLLOWING
| CURRENT_ROW
) 规范。也不应该有分号:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...
TypeError: cannot concatenate 'str' and 'int' objects
如例外情况所述 - 您不能对字符串和整数调用 +
。您可能需要专栏:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
但这不受支持。范围必须是固定大小,不能根据表达式定义。
An important point is that I don't want to collect data
Window 没有 partitionBy
的定义:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
和收集一样糟糕。因此,即使 "dynamic frame"(有条件和无限制 window)问题有解决方法,他们也不会在这里帮助你。