Spark - 如何使用有状态映射器对排序的 RDD 进行平面映射?
Spark - how to flatmap sorted RDD using a stateful mapper?
基本上我有一个 RDD/DataFrame 包含一系列事件(带有一些 categoryId)。它们带有时间戳并按时间排序。
我想做的是扫描每个类别中的所有这些事件,同时 keeping/updating 某些状态会记住是否看到了某些事件。一些例子:
- 用户登录亚马逊(记录新会话ID、时间戳)
- 用户将商品添加到 basked(将购物篮大小增加到 1)
- 用户执行结帐(增加花费的钱,输出rdd添加项目:sessionId+start_timestamp+number if items + money spented)
- 用户在购物车中添加了其他东西
- 执行结帐 -> 将下一项添加到输出 rdd
所以我非常想用有状态映射器(它会记住以前的项目)来做一个平面地图。映射器可以有一个 "states" 按 categoryId 的映射。但是有几百万个类别是什么?有没有比按类别+时间戳排序更好的方法?
我还需要确保整个类别都在一个节点上。在这种情况下我应该按类别划分吗?我不确定数百万个分区是否是个好主意。
由于您的问题很笼统,您会得到一个笼统的答案。除非你有充分的理由不这样做,否则你应该使用 Data Frames and Window Functions。
上面的第一个会给你所有的好处 Catalyst Optimizer。第二个应该提供您可以按照您描述的方式处理数据的操作:
PARTITION BY
- 按类别划分数据
ORDER BY
- 按时间戳排序
FRAME
(ROWS
/ RANGE
) - window 大小的可选限制
- 实际functions执行所需的操作
旁注:
I'm not sure if millions of partitions are good idea.
不,这根本不是一个好主意,但按某个键进行分区并不意味着您需要与唯一键数相同的分区数:
import org.apache.spark.HashPartitioner
val rdd = sc.parallelize(
(1 to 10).flatMap(k => (1 to 100).map(_ => (k, scala.util.Random.nextInt)))
).partitionBy(new HashPartitioner(2))
在上面的示例中,您有 10 个不同的值,但只有 2 个分区。
基本上我有一个 RDD/DataFrame 包含一系列事件(带有一些 categoryId)。它们带有时间戳并按时间排序。 我想做的是扫描每个类别中的所有这些事件,同时 keeping/updating 某些状态会记住是否看到了某些事件。一些例子:
- 用户登录亚马逊(记录新会话ID、时间戳)
- 用户将商品添加到 basked(将购物篮大小增加到 1)
- 用户执行结帐(增加花费的钱,输出rdd添加项目:sessionId+start_timestamp+number if items + money spented)
- 用户在购物车中添加了其他东西
- 执行结帐 -> 将下一项添加到输出 rdd
所以我非常想用有状态映射器(它会记住以前的项目)来做一个平面地图。映射器可以有一个 "states" 按 categoryId 的映射。但是有几百万个类别是什么?有没有比按类别+时间戳排序更好的方法? 我还需要确保整个类别都在一个节点上。在这种情况下我应该按类别划分吗?我不确定数百万个分区是否是个好主意。
由于您的问题很笼统,您会得到一个笼统的答案。除非你有充分的理由不这样做,否则你应该使用 Data Frames and Window Functions。
上面的第一个会给你所有的好处 Catalyst Optimizer。第二个应该提供您可以按照您描述的方式处理数据的操作:
PARTITION BY
- 按类别划分数据ORDER BY
- 按时间戳排序FRAME
(ROWS
/RANGE
) - window 大小的可选限制- 实际functions执行所需的操作
旁注:
I'm not sure if millions of partitions are good idea.
不,这根本不是一个好主意,但按某个键进行分区并不意味着您需要与唯一键数相同的分区数:
import org.apache.spark.HashPartitioner
val rdd = sc.parallelize(
(1 to 10).flatMap(k => (1 to 100).map(_ => (k, scala.util.Random.nextInt)))
).partitionBy(new HashPartitioner(2))
在上面的示例中,您有 10 个不同的值,但只有 2 个分区。