在 spark 中,广播是如何工作的?
In spark, how does broadcast work?
这是一个非常简单的问题:在spark中,broadcast
可以用来高效地向executor发送变量。这是如何工作的?
更准确地说:
- 何时发送值:我一调用
broadcast
,或何时使用这些值?
- 数据究竟发送到哪里:发送给所有执行者,还是只发送给需要它的执行者?
- 数据存储在哪里?在内存中,还是在磁盘上?
- 简单变量和广播变量的访问方式有区别吗?当我调用
.value
方法时,幕后发生了什么?
- 一播
- 它使用 Torrent 协议发送给所有执行者,但仅在需要时加载
- 一旦加载的变量被反序列化存储在内存中
它:
- 验证广播没有被销毁
- 从 blockManager 延迟加载变量
简答
- 值在执行程序中第一次需要时发送。调用
sc.broadcast(variable)
时不发送任何内容。
- 数据仅发送到包含需要它的执行程序的节点。
- 数据存储在内存中。如果没有足够的可用内存,则使用磁盘。
- 是的,访问局部变量和访问广播变量有很大区别。广播变量必须在第一次访问时下载。
长答案
答案在 Spark 的源代码中,在 TorrentBroadcast.scala
。
调用sc.broadcast
时,会从BroadcastFactory.scala
实例化一个新的TorrentBroadcast
对象。以下发生在 writeBlocks()
中,它在初始化 TorrentBroadcast 对象时被调用:
- 对象使用
MEMORY_AND_DISK
策略在本地缓存反序列化。
- 已连载
- 序列化版本被分成 4Mb 块,压缩[0],并保存在本地[ 1].
创建新执行器时,它们只有轻量级 TorrentBroadcast
对象,仅包含广播对象的标识符及其块数。
TorrentBroadcast
对象有一个包含其值的惰性[2] 属性。当调用value
方法时,返回这个惰性的属性。因此,第一次在任务上调用此值函数时,会发生以下情况:
- 以随机顺序从本地块管理器中获取块并解压缩。
- 如果它们不存在于本地块管理器中,则在块管理器上调用
getRemoteBytes
来获取它们。 仅在那个时候发生网络流量。
- 如果该块不存在于本地,则使用
MEMORY_AND_DISK_SER
对其进行缓存。
[0] 默认用lz4压缩。 This can be tuned.
[1]块存储在本地块管理器,使用MEMORY_AND_DISK_SER
,这意味着它将不适合内存的分区溢出到磁盘。每个块都有一个唯一标识符,根据广播变量的标识符及其偏移量计算得出。块大小可配置;默认为 4Mb。
[2] scala中的一个lazy val是一个变量,它的值在第一次被访问时被计算,然后被缓存. See the documentation.
这是一个非常简单的问题:在spark中,broadcast
可以用来高效地向executor发送变量。这是如何工作的?
更准确地说:
- 何时发送值:我一调用
broadcast
,或何时使用这些值? - 数据究竟发送到哪里:发送给所有执行者,还是只发送给需要它的执行者?
- 数据存储在哪里?在内存中,还是在磁盘上?
- 简单变量和广播变量的访问方式有区别吗?当我调用
.value
方法时,幕后发生了什么?
- 一播
- 它使用 Torrent 协议发送给所有执行者,但仅在需要时加载
- 一旦加载的变量被反序列化存储在内存中
它:
- 验证广播没有被销毁
- 从 blockManager 延迟加载变量
简答
- 值在执行程序中第一次需要时发送。调用
sc.broadcast(variable)
时不发送任何内容。 - 数据仅发送到包含需要它的执行程序的节点。
- 数据存储在内存中。如果没有足够的可用内存,则使用磁盘。
- 是的,访问局部变量和访问广播变量有很大区别。广播变量必须在第一次访问时下载。
长答案
答案在 Spark 的源代码中,在 TorrentBroadcast.scala
。
调用
sc.broadcast
时,会从BroadcastFactory.scala
实例化一个新的TorrentBroadcast
对象。以下发生在writeBlocks()
中,它在初始化 TorrentBroadcast 对象时被调用:- 对象使用
MEMORY_AND_DISK
策略在本地缓存反序列化。 - 已连载
- 序列化版本被分成 4Mb 块,压缩[0],并保存在本地[ 1].
- 对象使用
创建新执行器时,它们只有轻量级
TorrentBroadcast
对象,仅包含广播对象的标识符及其块数。TorrentBroadcast
对象有一个包含其值的惰性[2] 属性。当调用value
方法时,返回这个惰性的属性。因此,第一次在任务上调用此值函数时,会发生以下情况:- 以随机顺序从本地块管理器中获取块并解压缩。
- 如果它们不存在于本地块管理器中,则在块管理器上调用
getRemoteBytes
来获取它们。 仅在那个时候发生网络流量。 - 如果该块不存在于本地,则使用
MEMORY_AND_DISK_SER
对其进行缓存。
[0] 默认用lz4压缩。 This can be tuned.
[1]块存储在本地块管理器,使用MEMORY_AND_DISK_SER
,这意味着它将不适合内存的分区溢出到磁盘。每个块都有一个唯一标识符,根据广播变量的标识符及其偏移量计算得出。块大小可配置;默认为 4Mb。
[2] scala中的一个lazy val是一个变量,它的值在第一次被访问时被计算,然后被缓存. See the documentation.