是否可以等到 EMR 集群终止?

Is it possible to wait until an EMR cluster is terminated?

我正在尝试编写一个组件来启动 EMR 集群,运行 该集群上的 Spark 管道,然后在管道完成后关闭该集群。

我已经创建了集群并设置了允许我的主集群的工作机器启动 EMR 集群的权限。但是,我正在努力调试创建的集群并等待管道结束。这是我现在的代码。请注意,我使用的是 Spark Scala,但这非常接近标准 Java 代码:

val runSparkJob = new StepConfig()
  .withName("Run Pipeline")
  .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
  .withHadoopJarStep(
    new HadoopJarStepConfig()
      .withJar("/path/to/jar")
      .withArgs(
        "spark-submit",
        "etc..."
      )
  )

// Create a cluster and run the Spark job on it
val clusterName = "REDACTED Cluster"
val createClusterRequest =
  new RunJobFlowRequest()
    .withName(clusterName)
    .withReleaseLabel(Configs.EMR_RELEASE_LABEL)
    .withSteps(enableDebugging, runSparkJob)
    .withApplications(new Application().withName("Spark"))
    .withLogUri(Configs.LOG_URI_PREFIX)
    .withServiceRole(Configs.SERVICE_ROLE)
    .withJobFlowRole(Configs.JOB_FLOW_ROLE)
    .withInstances(
      new JobFlowInstancesConfig()
        .withEc2SubnetId(Configs.SUBNET)
        .withInstanceCount(Configs.INSTANCE_COUNT)
        .withKeepJobFlowAliveWhenNoSteps(false)
        .withMasterInstanceType(Configs.MASTER_INSTANCE_TYPE)
        .withSlaveInstanceType(Configs.SLAVE_INSTANCE_TYPE)
    )

val newCluster = emr.runJobFlow(createClusterRequest)

我有两个具体问题:

  1. 提交结果后立即调用emr.runJobFlowreturns。有什么方法可以让它阻塞直到集群关闭或以其他方式等到工作流结束?

  2. 我的集群实际上没有启动,当我进入 AWS Console -> EMR -> Events 视图时,我看到一个故障:

    Amazon EMR Cluster j-XXX (REDACTED...) has terminated with errors at 2019-06-13 19:50 UTC with a reason of VALIDATION_ERROR.

有什么方法可以在我的 Java/Scala 应用程序中以编程方式处理此错误?

是的,很可能要等到 EMR 集群终止。

waiters 将阻止执行,直到集群(即作业流)达到特定状态。

val newCluster = emr.runJobFlow(createClusterRequest);
val describeRequest = new DescribeClusterRequest()
    .withClusterId(newCluster.getClusterId())

// Wait until terminated
emr.waiters().clusterTerminated().run(new WaiterParameters(describeRequest))

另外,如果想获取集群的状态(即job flow),可以调用EMR客户端的describeCluster函数。查看链接的文档,因为您可以获得有关集群的状态和状态信息,以确定它是成功还是错误。

val result = emr.describeCluster(describeRequest)

注意:不是最好的 Java-er 所以以上是我最好的猜测以及它如何根据文档工作,但我没有测试以上内容。