Spark RDD 沿袭和存储
Spark RDD Lineage and Storage
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
badLinesCount = badLinesRDD.count()
warningCount = warningsRDD.count()
在上面的代码中,none 的转换会被评估,直到倒数第二行代码被执行,您计算 badLinesRDD 中的对象数。因此,当此 badLinesRDD.count()
为 运行 时,它将计算前四个 RDD 直到并集,然后 return 您得到结果。但是当 warningsRDD.count()
是 运行 时,它只会计算转换 RDD,直到前 3 行并且 return 你的结果正确吗?
此外,当对这些 RDD 转换调用操作时计算这些 RDD 转换时,最后一个 RDD 转换(联合)的对象存储在哪里?它是否存储在过滤器转换为 运行 的每个 DataNode 的内存中?
除非明确保留任务输出(例如 cache
、persist
)或隐式保留(随机写入)并且有足够的空闲 space 每个操作都将执行完整的沿袭。
因此,当您调用 warningsRDD.count()
时,它将加载文件 (sc.textFile("log.txt")
) 和过滤器 (inputRDD.filter(lambda x: "warning" in x)
)。
Also when these RDD transformations are computed when an action is called on them where are the objects from the last RDD transformation, which is union, stored?
假设数据没有持久化,无处可去。在数据传递到下一阶段或输出后,所有任务输出都将被丢弃。是否持久化数据取决于设置(磁盘、堆上、堆外、DFS)。
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
badLinesCount = badLinesRDD.count()
warningCount = warningsRDD.count()
在上面的代码中,none 的转换会被评估,直到倒数第二行代码被执行,您计算 badLinesRDD 中的对象数。因此,当此 badLinesRDD.count()
为 运行 时,它将计算前四个 RDD 直到并集,然后 return 您得到结果。但是当 warningsRDD.count()
是 运行 时,它只会计算转换 RDD,直到前 3 行并且 return 你的结果正确吗?
此外,当对这些 RDD 转换调用操作时计算这些 RDD 转换时,最后一个 RDD 转换(联合)的对象存储在哪里?它是否存储在过滤器转换为 运行 的每个 DataNode 的内存中?
除非明确保留任务输出(例如 cache
、persist
)或隐式保留(随机写入)并且有足够的空闲 space 每个操作都将执行完整的沿袭。
因此,当您调用 warningsRDD.count()
时,它将加载文件 (sc.textFile("log.txt")
) 和过滤器 (inputRDD.filter(lambda x: "warning" in x)
)。
Also when these RDD transformations are computed when an action is called on them where are the objects from the last RDD transformation, which is union, stored?
假设数据没有持久化,无处可去。在数据传递到下一阶段或输出后,所有任务输出都将被丢弃。是否持久化数据取决于设置(磁盘、堆上、堆外、DFS)。