何时使用 mapParitions 和 mapPartitionsWithIndex?
when to use mapParitions and mapPartitionsWithIndex?
PySpark 文档描述了两个函数:
mapPartitions(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
还有...
mapPartitionsWithIndex(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
6
这些功能试图解决哪些用例?我不明白为什么需要它们。
要回答这个问题,我们需要将 map 与 mapPartitions/mapPartitionsWithIndex 进行比较(mapPartitions 和 mapPartitionsWithIndex 几乎做同样的事情,除了 mapPartitionsWithIndex 您可以跟踪正在处理的分区)。
现在 mapPartitions 和 mapPartitionsWithIndex 用于优化应用程序的性能。为了便于理解,假设您的 RDD 中的所有元素都是 XML 元素,您需要一个解析器来处理它们中的每一个。所以你必须使用一个好的解析器实例 class 才能继续。您可以通过两种方式做到这一点:
map + foreach: 在这种情况下,对于每个元素,解析器实例 class 将被创建,元素将被处理,然后实例将及时销毁,但此实例不会用于其他元素。因此,如果您正在使用分布在 4 个分区中的 12 个元素的 RDD,则解析器实例将被创建 12 次。如您所知,创建实例是一项非常昂贵的操作,因此需要时间。
mapPartitions/mapPartitionsWithIndex: 这两个方法可以稍微解决上面的情况。 mapPartitions/mapPartitionsWithIndex 适用于分区,而不适用于元素(请不要误会我的意思,将处理所有元素)。这些方法将为每个分区创建一次解析器实例。由于您只有 4 个分区,解析器实例将被创建 4 次(对于此示例,比 map 少 8 次)。但是您将传递给这些方法的函数应该采用 Iterator 对象(一次将分区的所有元素作为输入)。所以在 mapPartitions 和 mapPartitionsWithIndex 的情况下,将创建解析器实例,将处理当前分区的所有元素,然后实例将在稍后由 GC 销毁。您会注意到它们可以显着提高您的应用程序的性能。
所以bottom-line是,每当你看到一些操作对所有元素都是通用的,一般来说,你可以做一次就可以处理所有的,最好是选择 mapPartitions/mapPartitionsWithIndex.
请在下面两个链接中找到带有代码示例的解释:
https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/
http://apachesparkbook.blogspot.in/2015/11/mappartition-example.html
PySpark 文档描述了两个函数:
mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
还有...
mapPartitionsWithIndex(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6
这些功能试图解决哪些用例?我不明白为什么需要它们。
要回答这个问题,我们需要将 map 与 mapPartitions/mapPartitionsWithIndex 进行比较(mapPartitions 和 mapPartitionsWithIndex 几乎做同样的事情,除了 mapPartitionsWithIndex 您可以跟踪正在处理的分区)。
现在 mapPartitions 和 mapPartitionsWithIndex 用于优化应用程序的性能。为了便于理解,假设您的 RDD 中的所有元素都是 XML 元素,您需要一个解析器来处理它们中的每一个。所以你必须使用一个好的解析器实例 class 才能继续。您可以通过两种方式做到这一点:
map + foreach: 在这种情况下,对于每个元素,解析器实例 class 将被创建,元素将被处理,然后实例将及时销毁,但此实例不会用于其他元素。因此,如果您正在使用分布在 4 个分区中的 12 个元素的 RDD,则解析器实例将被创建 12 次。如您所知,创建实例是一项非常昂贵的操作,因此需要时间。
mapPartitions/mapPartitionsWithIndex: 这两个方法可以稍微解决上面的情况。 mapPartitions/mapPartitionsWithIndex 适用于分区,而不适用于元素(请不要误会我的意思,将处理所有元素)。这些方法将为每个分区创建一次解析器实例。由于您只有 4 个分区,解析器实例将被创建 4 次(对于此示例,比 map 少 8 次)。但是您将传递给这些方法的函数应该采用 Iterator 对象(一次将分区的所有元素作为输入)。所以在 mapPartitions 和 mapPartitionsWithIndex 的情况下,将创建解析器实例,将处理当前分区的所有元素,然后实例将在稍后由 GC 销毁。您会注意到它们可以显着提高您的应用程序的性能。
所以bottom-line是,每当你看到一些操作对所有元素都是通用的,一般来说,你可以做一次就可以处理所有的,最好是选择 mapPartitions/mapPartitionsWithIndex.
请在下面两个链接中找到带有代码示例的解释: https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ http://apachesparkbook.blogspot.in/2015/11/mappartition-example.html