当我调用 rdd.join(rdd) 时发生了什么

What is happening when I call rdd.join(rdd)

我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构:

List<Tuple2<String, Tuple2<Integer, Integer>>> dat2 = new ArrayList<>();
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(1, 1)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(2, 5)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(3, 78)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(1, 6)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(2, 11)));
JavaRDD<Tuple2<String, Tuple2<Integer, Integer>>> y2 = sc.parallelize(dat2);

现在,每个人的数据都可以这样查看:(时间戳,值)。我想知道每一行在 +-1 时间戳中发生的值的数量。 (我知道这看起来像滑动 window 但我想要事件级别的粒度

y2.join(y2);
resultOfJoin.filter(t -> t._2()._1()._1() - t._2()._2()._1() <= 1 && t._2()._1()._1() - t._2()._2()._1() >= -1)

在这种情况下,我寻求的最佳解决方案是将 RDD 自身连接起来,为每个人创建 k^2 行,其中 k 是与此人关联的行数。

现在,我知道这是一场彻头彻尾的灾难。我知道这会导致洗牌(而且洗牌很糟糕)但我无法提供更好的东西。

我有 3 个问题:

  1. 由于我在连接后立即过滤,是否会影响连接造成的压力(换句话说,是否会有任何优化)?
  2. 网络上传递的行数是多少? (我知道在最坏的情况下,结果 RDD 将有 n^2 行)网络上发送的行是#workersn(只发送一个副本并在 worker 上复制)还是#workersn^2(在结果 worker 上为每 2 行组合发送行)?
  3. 如果我愿意与 Dataset 合作,我可以加入过滤器。我知道数据集对计算图有额外的优化。如果我过渡到数据集,我应该期望有多少改进?

Since I filter right after the join, will it effect the stress caused by the join (in other words, will there be any optimizations)?

不,不会有任何优化。

What is the volume of rows passed on the network?

O(N)(具体来说,每条记录将被洗牌两次,每个父项一次)您通过键加入,因此每个项目都进入一个,并且只有一个分区。

If I would of worked with Dataset I could join with filter. I understand Datasets have additional optimization for the computation graph. How much improvement, if any, should I expect if I transit to Datasets?

Shuffle 过程得到了更好的优化,但除此之外,您不能指望任何针对特定情况的优化。

wish to know for every row the number of values happening in +-1 timestamp.

尝试 window 功能:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").ordetBy("timestamp")

rdd.toDF("id", "data")
  .select($"id", $"data._1" as "timestamp", $"data._2" as "value"))
  .withColumn("lead", lead($"value", 1).over(w))
  .withColumn("lag", lag($"value", 1).over(w))