Spark 作业返回后不久,推测任务能否继续 运行?
Can a speculative task continue running shortly after a Spark job returned?
我的一个简单的 Spark 作业有问题,简化后看起来像这样。
JavaRDD<ObjectNode> rdd = pullAndProcessData();
ManifestFilesystem fs = getOutputFS();
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect();
fs.writeManifest(Manifest.makeManifest(writeObjectResults));
我对这段代码的期望是,无论发生什么,当且仅当所有任务都已完成并将其分区成功写入 S3 时,writeManifest
才会被调用。问题是,显然,一些任务在清单之后写入 S3,这永远不会发生。
在 ManifestFilesystem.write
中,我删除了现有的清单(如果有的话)使其失效,因为正常的工作流程应该是:
- 将所有分区写入S3
- 将清单写入 S3
我怀疑它可能是由于推测的任务而发生的,在以下情况下:
- 一些任务被标记为可预测并重新发送给其他从属
- 所有推测的任务 return 在至少一个他们被发送到的奴隶上,但其中一些保持 运行 在较慢的奴隶上
- Spark 不会中断任务或return在任务中断之前将
collect
的结果发送给驱动程序
- 仍然是 运行 的推测任务最终执行
ManifestTimeslice.write
并在写入它们的分区之前删除清单
这是可能发生的事情吗?有人对这种行为有另一种假设吗?
注意:不能使用内置数据发布方法
注2:我实际上发现了this这倾向于证实我的直觉,但如果能得到证实还是很棒的,因为我没有使用标准由于超出此问题范围的原因,HDFS 或 S3 read/write 方法。
Spark 不会主动终止推测性任务。它只是等到任务完成并忽略结果。我认为您的推测任务完全有可能在 collect
调用后继续写入。
在意识到从 Spark 的角度来看没有办法解决这个问题后,我将回答我自己的问题:如何确保在所有推测性任务有时间完成之前杀死它们?实际上最好完全让它们 运行 ,否则它们可能会在写入文件时被杀死,然后将被 t运行cated.
有不同的可能方法:
this thread 中的几条消息表明,一种常见做法是在执行原子重命名之前写入临时 attempt 文件(便宜大多数文件系统,因为它只是一个指针开关)。如果推测任务试图将其临时文件重命名为现有名称(如果操作是原子操作,则不会同时发生),则忽略重命名请求并删除临时文件。
据我所知,S3 不提供原子重命名。另外,虽然上述过程相当容易实施,但我们目前正在尝试最大限度地限制自制解决方案并保持系统简单。因此,我的最终解决方案是使用 jobId
(例如,作业开始时的时间戳)并将其传递给从属设备并将其写入清单中。将文件写入 FS 时,将应用以下逻辑:
public WriteObjectResult write(File localTempFile, long jobId) {
// cheap operation to check if the manifest is already there
if (manifestsExists()) {
long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId"));
if (manifestJobId == jobId) {
log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request."
return null;
}
log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest.");
deleteExistingManifest();
}
return publish(localTempFile);
}
我的一个简单的 Spark 作业有问题,简化后看起来像这样。
JavaRDD<ObjectNode> rdd = pullAndProcessData();
ManifestFilesystem fs = getOutputFS();
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect();
fs.writeManifest(Manifest.makeManifest(writeObjectResults));
我对这段代码的期望是,无论发生什么,当且仅当所有任务都已完成并将其分区成功写入 S3 时,writeManifest
才会被调用。问题是,显然,一些任务在清单之后写入 S3,这永远不会发生。
在 ManifestFilesystem.write
中,我删除了现有的清单(如果有的话)使其失效,因为正常的工作流程应该是:
- 将所有分区写入S3
- 将清单写入 S3
我怀疑它可能是由于推测的任务而发生的,在以下情况下:
- 一些任务被标记为可预测并重新发送给其他从属
- 所有推测的任务 return 在至少一个他们被发送到的奴隶上,但其中一些保持 运行 在较慢的奴隶上
- Spark 不会中断任务或return在任务中断之前将
collect
的结果发送给驱动程序 - 仍然是 运行 的推测任务最终执行
ManifestTimeslice.write
并在写入它们的分区之前删除清单
这是可能发生的事情吗?有人对这种行为有另一种假设吗?
注意:不能使用内置数据发布方法
注2:我实际上发现了this这倾向于证实我的直觉,但如果能得到证实还是很棒的,因为我没有使用标准由于超出此问题范围的原因,HDFS 或 S3 read/write 方法。
Spark 不会主动终止推测性任务。它只是等到任务完成并忽略结果。我认为您的推测任务完全有可能在 collect
调用后继续写入。
在意识到从 Spark 的角度来看没有办法解决这个问题后,我将回答我自己的问题:如何确保在所有推测性任务有时间完成之前杀死它们?实际上最好完全让它们 运行 ,否则它们可能会在写入文件时被杀死,然后将被 t运行cated.
有不同的可能方法:
this thread 中的几条消息表明,一种常见做法是在执行原子重命名之前写入临时 attempt 文件(便宜大多数文件系统,因为它只是一个指针开关)。如果推测任务试图将其临时文件重命名为现有名称(如果操作是原子操作,则不会同时发生),则忽略重命名请求并删除临时文件。
据我所知,S3 不提供原子重命名。另外,虽然上述过程相当容易实施,但我们目前正在尝试最大限度地限制自制解决方案并保持系统简单。因此,我的最终解决方案是使用
jobId
(例如,作业开始时的时间戳)并将其传递给从属设备并将其写入清单中。将文件写入 FS 时,将应用以下逻辑:public WriteObjectResult write(File localTempFile, long jobId) { // cheap operation to check if the manifest is already there if (manifestsExists()) { long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId")); if (manifestJobId == jobId) { log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request." return null; } log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest."); deleteExistingManifest(); } return publish(localTempFile); }