当文件无法放入 spark 的主内存时,spark 如何读取大文件(PB)
How spark read a large file (petabyte) when file can not be fit in spark's main memory
在这些情况下大文件会怎样?
1) Spark 从 NameNode 获取数据的位置。根据 NameNode 的信息,Spark 是否会在同一时间停止,因为数据大小太长?
2) Spark 根据数据节点块大小对数据进行分区,但不能将所有数据存储到主内存中。这里我们没有使用 StorageLevel。那么这里会发生什么呢?
3) Spark 对数据进行分区,一些数据将存储在主内存中,一旦主内存存储的数据再次处理,spark 将从磁盘加载其他数据。
首先,Spark 仅在调用操作(如 count
、collect
或 write
)时才开始读取数据。调用操作后,Spark 会在 partitions 中加载数据 - 同时加载的分区数取决于您可用的内核数。所以在Spark中你可以认为1个partition = 1 core = 1 task。请注意,所有并发加载的分区都必须适合内存,否则会出现 OOM。
假设您有多个阶段,Spark 然后仅在加载的分区上运行第一阶段的转换。一旦它对加载分区中的数据应用了转换,它就会将输出存储为 shuffle-data,然后读入更多分区。然后它在这些分区上应用转换,将输出存储为随机数据,读取更多分区等等,直到读取所有数据。
如果您不应用任何转换,而只执行 count
等操作,Spark 仍会读入分区中的数据,但它不会在您的集群中存储任何数据,如果您执行 count
再次读取所有数据。为避免多次读取数据,您可以调用 cache
或 persist
,在这种情况下,Spark 将 尝试将数据存储在您的集群中。在 cache
上(与 persist(StorageLevel.MEMORY_ONLY)
相同,它将所有分区存储在内存中 - 如果它不适合内存,您将获得 OOM。如果您调用 persist(StorageLevel.MEMORY_AND_DISK)
它将存储尽可能多地放在内存中,其余的将放在磁盘上。如果数据不适合磁盘,OS 通常会杀死你的工人。
请注意,Spark 有自己的小内存管理系统。如果您调用 cache
或 persist
,您分配给 Spark 作业的一些内存用于保存正在处理的数据,一些内存用于存储。
我希望这个解释对您有所帮助:)
这直接引自 Apache Spark 常见问题 (FAQ | Apache Spark)
Does my data need to fit in memory to use Spark?
No. Spark's operators spill data to disk if it does not fit in memory,
allowing it to run well on any sized data. Likewise, cached datasets
that do not fit in memory are either spilled to disk or recomputed on
the fly when needed, as determined by the RDD's storage level.
在 Apache Spark 中,如果数据不适合内存,那么 Spark 会简单地将数据保存到磁盘。
Apache Spark 中的persist 方法提供了六个持久存储级别来持久化数据。
MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER
(Java and Scala), MEMORY_AND_DISK_SER
(Java and Scala), DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, OFF_HEAP.
OFF_HEAP 存储正在试验中。
在这些情况下大文件会怎样?
1) Spark 从 NameNode 获取数据的位置。根据 NameNode 的信息,Spark 是否会在同一时间停止,因为数据大小太长?
2) Spark 根据数据节点块大小对数据进行分区,但不能将所有数据存储到主内存中。这里我们没有使用 StorageLevel。那么这里会发生什么呢?
3) Spark 对数据进行分区,一些数据将存储在主内存中,一旦主内存存储的数据再次处理,spark 将从磁盘加载其他数据。
首先,Spark 仅在调用操作(如 count
、collect
或 write
)时才开始读取数据。调用操作后,Spark 会在 partitions 中加载数据 - 同时加载的分区数取决于您可用的内核数。所以在Spark中你可以认为1个partition = 1 core = 1 task。请注意,所有并发加载的分区都必须适合内存,否则会出现 OOM。
假设您有多个阶段,Spark 然后仅在加载的分区上运行第一阶段的转换。一旦它对加载分区中的数据应用了转换,它就会将输出存储为 shuffle-data,然后读入更多分区。然后它在这些分区上应用转换,将输出存储为随机数据,读取更多分区等等,直到读取所有数据。
如果您不应用任何转换,而只执行 count
等操作,Spark 仍会读入分区中的数据,但它不会在您的集群中存储任何数据,如果您执行 count
再次读取所有数据。为避免多次读取数据,您可以调用 cache
或 persist
,在这种情况下,Spark 将 尝试将数据存储在您的集群中。在 cache
上(与 persist(StorageLevel.MEMORY_ONLY)
相同,它将所有分区存储在内存中 - 如果它不适合内存,您将获得 OOM。如果您调用 persist(StorageLevel.MEMORY_AND_DISK)
它将存储尽可能多地放在内存中,其余的将放在磁盘上。如果数据不适合磁盘,OS 通常会杀死你的工人。
请注意,Spark 有自己的小内存管理系统。如果您调用 cache
或 persist
,您分配给 Spark 作业的一些内存用于保存正在处理的数据,一些内存用于存储。
我希望这个解释对您有所帮助:)
这直接引自 Apache Spark 常见问题 (FAQ | Apache Spark)
Does my data need to fit in memory to use Spark?
No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
在 Apache Spark 中,如果数据不适合内存,那么 Spark 会简单地将数据保存到磁盘。
Apache Spark 中的persist 方法提供了六个持久存储级别来持久化数据。
MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER
(Java and Scala), MEMORY_AND_DISK_SER
(Java and Scala), DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, OFF_HEAP.
OFF_HEAP 存储正在试验中。