在 spark 中,广播是如何工作的?

In spark, how does broadcast work?

这是一个非常简单的问题:在spark中,broadcast可以用来高效地向executor发送变量。这是如何工作的?

更准确地说:

  • 一播
  • 它使用 Torrent 协议发送给所有执行者,但仅在需要时加载
  • 一旦加载的变量被反序列化存储在内存中
  • 它:

    • 验证广播没有被销毁
    • 从 blockManager 延迟加载变量

简答

  • 值在执行程序中第一次需要时发送。调用 sc.broadcast(variable) 时不发送任何内容。
  • 数据仅发送到包含需要它的执行程序的节点。
  • 数据存储在内存中。如果没有足够的可用内存,则使用磁盘。
  • 是的,访问局部变量和访问广播变量有很大区别。广播变量必须在第一次访问时下载。

长答案

答案在 Spark 的源代码中,在 TorrentBroadcast.scala

  1. 调用sc.broadcast时,会从BroadcastFactory.scala实例化一个新的TorrentBroadcast对象。以下发生在 writeBlocks() 中,它在初始化 TorrentBroadcast 对象时被调用:

    1. 对象使用 MEMORY_AND_DISK 策略在本地缓存反序列化。
    2. 已连载
    3. 序列化版本被分成 4Mb 块,压缩[0],并保存在本地[ 1].
  2. 创建新执行器时,它们只有轻量级 TorrentBroadcast 对象,仅包含广播对象的标识符及其块数。

  3. TorrentBroadcast 对象有一个包含其值的惰性[2] 属性。当调用value方法时,返回这个惰性的属性。因此,第一次在任务上调用此值函数时,会发生以下情况:

    1. 以随机顺序从本地块管理器中获取块并解压缩。
    2. 如果它们不存在于本地块管理器中,则在块管理器上调用 getRemoteBytes 来获取它们。 仅在那个时候发生网络流量
    3. 如果该块不存在于本地,则使用 MEMORY_AND_DISK_SER 对其进行缓存。

[0] 默认用lz4压缩。 This can be tuned.

[1]块存储在本地块管理器,使用MEMORY_AND_DISK_SER,这意味着它将不适合内存的分区溢出到磁盘。每个块都有一个唯一标识符,根据广播变量的标识符及其偏移量计算得出。块大小可配置;默认为 4Mb。

[2] scala中的一个lazy val是一个变量,它的值在第一次被访问时被计算,然后被缓存. See the documentation.