在具有非恒定帧大小的 Spark 中应用 window 函数

Apply window function in Spark with non constant frame size

我的问题

我目前在使用 Spark window 函数时遇到困难。我正在使用 Spark(通过 pyspark)版本 1.6.3(关联 Python 版本 2.6.6)。我 运行 一个 pyspark shell 实例自动初始化 HiveContext 作为我的 sqlContext.

我想用 window 函数进行滚动求和。我的问题是 window 框架不固定:这取决于我们考虑的观察。更具体地说,我通过一个名为 rank_id 的变量对数据进行排序,并希望对索引 $x+1$ 和 $2x-1$ 之间索引为 $x$ 的任何观察值进行滚动求和。因此,我的 rangeBetween 必须依赖于 rank_id 变量值。

重要的一点是我不想收集数据因此不能使用像numpy这样的东西(我的数据有很多观察)。

可重现的例子

from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window

# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")

#df.show(2)
#+-------------------+------------------+                                        
#|                  x|                 y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows

# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+                             
#|                   x|                   y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458|      1|
#| 0.10943843181953838|  0.6478505849227775|      2|
#| 0.13916818312857027| 0.24165348228464578|      3|
#+--------------------+--------------------+-------+
#only showing top 3 rows

固定宽度累计和:没问题

使用 window 函数,我可以 运行 给定数量的索引的累计和(我在这里使用 rangeBetween 但对于这个例子 rowBetween 可以无差别地使用)。

w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+          
#|                   x|                   y|rank_id|             roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458|      1|0.9698521852602887|
#| 0.10943843181953838|  0.6478505849227775|      2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578|      3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows

累计宽度不固定

我想在索引 x+12x-1 之间求和,其中 x是我的行索引。当我尝试将它传递给 Spark 时(与我们为 orderBy 所做的类似方式可能就是问题所在),我收到以下错误

# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)

Traceback (most recent call last): File "", line 1, in TypeError: cannot concatenate 'str' and 'int' objects

我尝试了其他方法,使用 SQL 语句

# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
   FROM tempdf;
""")

这一次给​​我以下错误

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line >580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near 'rank_id' '+' '1' in windowframeboundary; line 3 pos 15"

我还注意到,当我使用 SQL OVER 子句尝试更简单的语句时,我遇到了类似的错误,这可能意味着我没有将 SQL 语句正确传递给 Spark

df2 = sqlContext.sql("""
   SELECT *, SUM(y)
   OVER (ORDER BY rank_id
   RANGE BETWEEN -1 AND 1) AS cumsum
   FROM tempdf;
 """)

Traceback (most recent call last): File "", line 6, in File "/opt/application/Spark/current/python/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/opt/application/Spark/current/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot recognize input near '-' '1' 'AND' in windowframeboundary; line 3 pos 15"

如何在 Spark 中使用 windowSQL 语句来解决我的问题?

How could I solve my problem by using either window or SQL statement within Spark?

TL;DR 您不能,或者至少不能以可扩展的方式满足当前要求。您可以尝试类似于在 RDD 上滑动的操作:

I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark

这是不正确的。范围规范需要 (PRECEDING | FOLLOWING | CURRENT_ROW) 规范。也不应该有分号:

SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf

I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that's the problem), I got the following error ...

TypeError: cannot concatenate 'str' and 'int' objects

如例外情况所述 - 您不能对字符串和整数调用 +。您可能需要专栏:

from pyspark.sql.functions import col

.rangeBetween(col('rank_id') + 1,  2* col('rank_id') - 1)

但这不受支持。范围必须是固定大小,不能根据表达式定义。

An important point is that I don't want to collect data

Window 没有 partitionBy 的定义:

w = Window.orderBy('rank_id').rangeBetween(-1,3)

和收集一样糟糕。因此,即使 "dynamic frame"(有条件和无限制 window)问题有解决方法,他们也不会在这里帮助你。