火花流中的并发操作
Concurrent operations in spark streaming
我想了解一些关于火花流执行的内部结构。
如果我有一个流 X,并且在我的程序中我将流 X 发送到函数 A 和函数 B:
在函数 A 中,我在 X->Y->Z 上执行一些 transform/filter 操作等以创建流 Z。现在我在 Z 上执行 forEach 操作并打印输出到文件。
然后在函数 B 中,我减少流 X -> X2(比如每个 RDD 的最小值),并将输出打印到文件
是否为每个 RDD 并行执行这两个函数?它是如何工作的?
谢谢
--- 来自 Spark 社区的评论 ----
我正在添加来自 spark 社区的评论 -
如果您在驱动程序的两个线程中执行收集步骤(foreach 在 1 中,可能在 2 中减少),那么它们将并行执行。先提交给 Spark 的先执行 - 如果需要确保执行顺序,可以使用信号量,但我认为顺序无关紧要。
是的。
它类似于 spark 的执行模型,该模型使用 DAG 和惰性评估,只是在每批新数据上重复流式传输 运行s DAG。
在您的情况下,由于 DAG(或更大 DAG 的子 DAG,如果有人喜欢这样称呼)需要完成每个动作(您拥有的 2 foreachs 中的每一个)不需要一直到源头都有共同的链接,它们 运行 完全在 parallel.The 流式应用程序作为一个整体在应用程序提交时分配给每个执行器的 X 执行器(JVM)和 Y 核心(线程)资源 manager.At 任何时候,X*Y 任务中的给定任务(即线程)将执行 其中一个 的一部分或全部 DAGs.Note应用程序的 2 个给定线程,无论是否在同一个执行程序中,都可以同时执行同一应用程序的不同操作。
@Eswara 的回答似乎是正确的,但它不适用于您的用例,因为您的单独转换 DAG(X->Y->Z
和 X->X2
)在 X
中有一个共同的 DStream 祖先。这意味着当操作 运行 触发这些流中的每一个时,转换 X->Y
和转换 X->X2
不能 同时发生。将会发生的是,RDD X
的分区将以非并行方式为这些转换中的每一个分别计算或从内存(如果缓存)加载。
理想情况下,转换 X->Y
将解析,然后转换 Y->Z
和 X->X2
将并行完成,因为它们之间没有共享状态。我相信 Spark 的流水线架构会为此进行优化。您可以通过持久化 DStream X
确保在 X->X2
上进行更快的计算,这样它就可以从内存中加载,而不是重新计算或从磁盘加载。有关持久性的更多信息,请参阅 here。
有趣的是,如果您可以提供复制存储级别 *_2
(例如 MEMORY_ONLY_2
或 MEMORY_AND_DISK_2
),以便能够同时 运行 转换同源。我认为这些存储级别目前只有 useful against lost partitions,因为重复的分区将代替丢失的分区进行处理。
我想了解一些关于火花流执行的内部结构。
如果我有一个流 X,并且在我的程序中我将流 X 发送到函数 A 和函数 B:
在函数 A 中,我在 X->Y->Z 上执行一些 transform/filter 操作等以创建流 Z。现在我在 Z 上执行 forEach 操作并打印输出到文件。
然后在函数 B 中,我减少流 X -> X2(比如每个 RDD 的最小值),并将输出打印到文件
是否为每个 RDD 并行执行这两个函数?它是如何工作的?
谢谢
--- 来自 Spark 社区的评论 ----
我正在添加来自 spark 社区的评论 -
如果您在驱动程序的两个线程中执行收集步骤(foreach 在 1 中,可能在 2 中减少),那么它们将并行执行。先提交给 Spark 的先执行 - 如果需要确保执行顺序,可以使用信号量,但我认为顺序无关紧要。
是的。 它类似于 spark 的执行模型,该模型使用 DAG 和惰性评估,只是在每批新数据上重复流式传输 运行s DAG。 在您的情况下,由于 DAG(或更大 DAG 的子 DAG,如果有人喜欢这样称呼)需要完成每个动作(您拥有的 2 foreachs 中的每一个)不需要一直到源头都有共同的链接,它们 运行 完全在 parallel.The 流式应用程序作为一个整体在应用程序提交时分配给每个执行器的 X 执行器(JVM)和 Y 核心(线程)资源 manager.At 任何时候,X*Y 任务中的给定任务(即线程)将执行 其中一个 的一部分或全部 DAGs.Note应用程序的 2 个给定线程,无论是否在同一个执行程序中,都可以同时执行同一应用程序的不同操作。
@Eswara 的回答似乎是正确的,但它不适用于您的用例,因为您的单独转换 DAG(X->Y->Z
和 X->X2
)在 X
中有一个共同的 DStream 祖先。这意味着当操作 运行 触发这些流中的每一个时,转换 X->Y
和转换 X->X2
不能 同时发生。将会发生的是,RDD X
的分区将以非并行方式为这些转换中的每一个分别计算或从内存(如果缓存)加载。
理想情况下,转换 X->Y
将解析,然后转换 Y->Z
和 X->X2
将并行完成,因为它们之间没有共享状态。我相信 Spark 的流水线架构会为此进行优化。您可以通过持久化 DStream X
确保在 X->X2
上进行更快的计算,这样它就可以从内存中加载,而不是重新计算或从磁盘加载。有关持久性的更多信息,请参阅 here。
有趣的是,如果您可以提供复制存储级别 *_2
(例如 MEMORY_ONLY_2
或 MEMORY_AND_DISK_2
),以便能够同时 运行 转换同源。我认为这些存储级别目前只有 useful against lost partitions,因为重复的分区将代替丢失的分区进行处理。