hadoop 临时表中的 Dataproc 冲突

Dataproc conflict in hadoop temporary tables

我有一个流程可以在不同区域的 Dataproc 集群上并行执行 spark 作业。它为每个区域创建一个集群,执行 spark 作业并在完成后删除集群。

Spark 作业使用 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset 方法传递 BigQuery Configuration 以在 BigQuery table 上保存数据。该作业保存数据不止一次 table,每个作业调用 saveAsNewAPIHadoopDataset 方法不止一次。

问题是有时我会收到一个错误,该错误是由内部为 运行 作业创建的 Hadoop 临时 BigQuery 数据集中的冲突引起的:

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
 "code" : 409,
 "errors" : [ {
   "domain" : "global",
   "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
   "reason" : "duplicate"
 } ],
 "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.interceptResponse(AbstractGoogleClientRequest.java:321)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1150)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
    ...

上面异常的时间戳 201802250620_0013_0013 后缀,我不确定它是否代表时间。

我的想法是,有时作业 运行 会同时出现,并尝试创建名称中具有相同时间戳的数据集。在并行作业中或在另一个 saveAsNewAPIHadoopDataset 调用的同一作业中。

我们如何在不延迟作业执行的情况下避免此错误?

我使用的依赖是:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>0.10.2-hadoop2</version>
    <scope>provided</scope>
</dependency>

Dataproc 映像版本为 1.1

编辑 1:

我尝试使用 IndirectBigQueryOutputFormat,但现在我收到一条错误消息,指出 gcs 输出路径已经存在,即使在每次 saveAsNewAPIHadoopDataset 调用时传递它的时间不同。

这是我的代码: SparkConf sc = new SparkConf().setAppName("MyApp");

try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
    JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
    JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
    JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();

    objsJson
    .filter(new FilterType(MSG_TYPE1))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));

    objsJson
    .filter(new FilterType(MSG_TYPE2))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));

    objsJson
    .filter(new FilterType(MSG_TYPE3))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));

    // here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}

Configuration createConf(String table, String outGCS) {
  Configuration conf = new Configuration();
  BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
  conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
  return conf;
}

我相信可能发生的情况是每个映射器都试图创建自己的数据集。这是相当低效的(并且会消耗与映射器数量成比例的每日配额)。

另一种方法是使用 IndirectBigQueryOutputFormat 输出 class:

IndirectBigQueryOutputFormat works by first buffering all the data into a Cloud Storage temporary table, and then, on commitJob, copies all data from Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs since it only requires one BigQuery "load" job per Hadoop/Spark job, as compared to BigQueryOutputFormat, which performs one BigQuery job for each Hadoop/Spark task.

参见此处示例:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example