Spark 将数据拉入 RDD 或数据框或数据集中
Spark pulling data into RDD or dataframe or dataset
当spark通过driver拉取数据,然后当spark不需要通过driver拉取数据时,我试图用简单的术语来说明。
我有 3 个问题 -
- 假设你有一个 20 TB 的平面文件存储在 HDFS 中,你可以从一个 driver 程序中使用相应库的开箱即用函数将其拉入数据框或 RDD (
sc.textfile(path)
或 sc.textfile(path).toDF
,等等)。如果driver是运行只有32GB内存,会不会导致driver程序出现OOM?或者至少对 driver Jim 进行互换?或者 spark 和 hadoop 是否足够聪明,可以将数据从 HDFS 分发到 spark 执行器中,从而在不经过 driver 的情况下生成 dataframe/RDD?
- 与 1 完全相同的问题,除了来自外部 RDBMS?
- 与 1 完全相同的问题,除了来自特定节点文件系统(仅 Unix 文件系统,20 TB 文件但不是 HDFS)?
关于 1
Spark 使用分布式数据结构,如 RDD 和 Dataset(以及 2.0 之前的 Dataframe)。为了得到问题的答案,您应该了解以下有关此数据结构的事实:
- 所有的转换操作,如(映射、过滤等)都是惰性的。
这意味着除非您需要
您操作的具体结果(例如减少、折叠或保存
结果到一些文件)。
- 在 HDFS 上处理文件时,Spark 运行
带文件分区。分区是最小的逻辑数据批次
可以处理。通常一个分区等于一个HDFS
块,分区总数永远不能少于
文件中的块数。常见(默认)HDFS 块大小为 128Mb
- RDD 中的所有实际计算(包括从 HDFS 读取)和
数据集在执行器内部执行,从不在 driver 上执行。 Driver
创建 DAG 和逻辑执行计划并将任务分配给
执行者进行进一步处理。
- 每个执行者运行之前的
针对特定数据分区分配的任务。所以通常情况下,如果你只为你的执行器分配一个核心,它会同时处理不超过 128Mb(默认 HDFS 块大小)的数据。
所以基本上当您调用 sc.textFile
时不会发生实际读取。所有提到的事实都解释了为什么即使处理 20 Tb 的数据也不会发生 OOM。
有一些特殊情况,例如 join
操作。但即使在这种情况下,所有执行程序都会将它们的中间结果刷新到本地磁盘以进行进一步处理。
关于 2
如果是 JDBC,您可以决定 table 有多少个分区。并在 table 中选择适当的分区键,将数据正确地分割成分区。有多少数据同时加载到内存中由你决定。
关于 3
本地文件的块大小由fs.local.block.size
属性控制(我猜默认是32Mb)。因此它与 1(HDFS 文件)基本相同,只是您将从一台机器和一个物理磁盘驱动器读取所有数据(这对于 20TB 文件来说效率极低)。
当spark通过driver拉取数据,然后当spark不需要通过driver拉取数据时,我试图用简单的术语来说明。
我有 3 个问题 -
- 假设你有一个 20 TB 的平面文件存储在 HDFS 中,你可以从一个 driver 程序中使用相应库的开箱即用函数将其拉入数据框或 RDD (
sc.textfile(path)
或sc.textfile(path).toDF
,等等)。如果driver是运行只有32GB内存,会不会导致driver程序出现OOM?或者至少对 driver Jim 进行互换?或者 spark 和 hadoop 是否足够聪明,可以将数据从 HDFS 分发到 spark 执行器中,从而在不经过 driver 的情况下生成 dataframe/RDD? - 与 1 完全相同的问题,除了来自外部 RDBMS?
- 与 1 完全相同的问题,除了来自特定节点文件系统(仅 Unix 文件系统,20 TB 文件但不是 HDFS)?
关于 1
Spark 使用分布式数据结构,如 RDD 和 Dataset(以及 2.0 之前的 Dataframe)。为了得到问题的答案,您应该了解以下有关此数据结构的事实:
- 所有的转换操作,如(映射、过滤等)都是惰性的。 这意味着除非您需要 您操作的具体结果(例如减少、折叠或保存 结果到一些文件)。
- 在 HDFS 上处理文件时,Spark 运行 带文件分区。分区是最小的逻辑数据批次 可以处理。通常一个分区等于一个HDFS 块,分区总数永远不能少于 文件中的块数。常见(默认)HDFS 块大小为 128Mb
- RDD 中的所有实际计算(包括从 HDFS 读取)和 数据集在执行器内部执行,从不在 driver 上执行。 Driver 创建 DAG 和逻辑执行计划并将任务分配给 执行者进行进一步处理。
- 每个执行者运行之前的 针对特定数据分区分配的任务。所以通常情况下,如果你只为你的执行器分配一个核心,它会同时处理不超过 128Mb(默认 HDFS 块大小)的数据。
所以基本上当您调用 sc.textFile
时不会发生实际读取。所有提到的事实都解释了为什么即使处理 20 Tb 的数据也不会发生 OOM。
有一些特殊情况,例如 join
操作。但即使在这种情况下,所有执行程序都会将它们的中间结果刷新到本地磁盘以进行进一步处理。
关于 2
如果是 JDBC,您可以决定 table 有多少个分区。并在 table 中选择适当的分区键,将数据正确地分割成分区。有多少数据同时加载到内存中由你决定。
关于 3
本地文件的块大小由fs.local.block.size
属性控制(我猜默认是32Mb)。因此它与 1(HDFS 文件)基本相同,只是您将从一台机器和一个物理磁盘驱动器读取所有数据(这对于 20TB 文件来说效率极低)。