什么时候在 Spark 中的用户定义聚合函数 UDAF 中发生合并

When does merge happen in User Defined Aggregating Functions UDAF in Spark

我想知道Spark在什么情况下会执行合并作为UDAF功能的一部分。

动机: 我在我的 Spark 项目中使用了很多 UDAF 函数 OVER a Window 。我经常想回答这样的问题:

在 30 天的 window 内,在与当前交易相同的国家/地区进行了多少次信用卡交易?

window 将从当前事务开始,但不会将其包括在计数中。它需要当前交易的价值才能知道在过去 30 天内要计算哪个国家/地区。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我编写了自定义 UDAF 来进行计数。我总是使用 .orderBy(orderByColumn.desc) 并且由于 .desc 当前交易在计算期间出现在 window 中的第一个。

UDAF 函数需要实现 merge 函数,该函数在并行计算中合并两个中间聚合缓冲区。如果发生任何合并,我的 current transaction 对于不同的缓冲区可能不相同,并且 UDAF 的结果将不正确。

我写了一个 UDAF 函数来计算我的数据集上的合并次数,并且只保留 window 中的第一笔交易与当前交易进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

当我 运行 在具有 16 cpu 的本地主机上使用 spark 2.0.1 打开它时,从来没有任何合并并且 window 中的第一个事务始终是当前事务.这就是我要的。在不久的将来,我将 运行 我的代码在 x100 更大的数据集和真正的分布式 Spark 集群上,并且想知道合并是否可以在那里发生。

问题:

At which circumstances/conditons mergers take place in UDAF?

merge 在 shuffle ("reduce side aggregation") 之后合并聚合函数 ("map side aggregation") 的部分应用时调用。

Do Windows with orderBy ever have mergers?

当前实施中永远不会。至于现在 window 函数只是花式 groupByKey,没有部分聚合。这当然是实现细节,将来可能会更改,恕不另行通知。

Is it possible to tell Spark not to do mergers?

不是。但是,如果数据已按聚合键分区,则不需要 merge,仅使用 combine

最后:

How many times a credit card transaction was made in the same country as the current transaction in the window of 30 days?

不调用 UDAFs 或 window 函数。我可能会用 o.a.s.sql.functions.window 创建翻滚 windows,按用户、国家/地区和 window 聚合,然后加入输入。