Spark 作业返回后不久,推测任务能否继续 运行?

Can a speculative task continue running shortly after a Spark job returned?

我的一个简单的 Spark 作业有问题,简化后看起来像这样。

JavaRDD<ObjectNode> rdd = pullAndProcessData();
ManifestFilesystem fs = getOutputFS();
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect();
fs.writeManifest(Manifest.makeManifest(writeObjectResults));

我对这段代码的期望是,无论发生什么,当且仅当所有任务都已完成并将其分区成功写入 S3 时,writeManifest 才会被调用。问题是,显然,一些任务在清单之后写入 S3,这永远不会发生。

ManifestFilesystem.write 中,我删除了现有的清单(如果有的话)使其失效,因为正常的工作流程应该是:

我怀疑它可能是由于推测的任务而发生的,在以下情况下:

这是可能发生的事情吗?有人对这种行为有另一种假设吗?

注意:不能使用内置数据发布方法

注2:我实际上发现了this这倾向于证实我的直觉,但如果能得到证实还是很棒的,因为我没有使用标准由于超出此问题范围的原因,HDFS 或 S3 read/write 方法。

Spark 不会主动终止推测性任务。它只是等到任务完成并忽略结果。我认为您的推测任务完全有可能在 collect 调用后继续写入。

在意识到从 Spark 的角度来看没有办法解决这个问题后,我将回答我自己的问题:如何确保在所有推测性任务有时间完成之前杀死它们?实际上最好完全让它们 运行 ,否则它们可能会在写入文件时被杀死,然后将被 t运行cated.

有不同的可能方法:

  • this thread 中的几条消息表明,一种常见做法是在执行原子重命名之前写入临时 attempt 文件(便宜大多数文件系统,因为它只是一个指针开关)。如果推测任务试图将其临时文件重命名为现有名称(如果操作是原子操作,则不会同时发生),则忽略重命名请求并删除临时文件。

  • 据我所知,S3 不提供原子重命名。另外,虽然上述过程相当容易实施,但我们目前正在尝试最大限度地限制自制解决方案并保持系统简单。因此,我的最终解决方案是使用 jobId(例如,作业开始时的时间戳)并将其传递给从属设备并将其写入清单中。将文件写入 FS 时,将应用以下逻辑:

    public WriteObjectResult write(File localTempFile, long jobId) {
        // cheap operation to check if the manifest is already there
        if (manifestsExists()) {
             long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId"));
             if (manifestJobId == jobId) {
                 log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request."
                 return null;
             }
             log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest.");
             deleteExistingManifest();
        }    
        return publish(localTempFile);
    }