使用 Dataproc (Spark) 在 BigQuery 中加载 CSV 文件

Load CSV File in BigQuery with Dataproc (Spark)

我正在尝试从 GCS 中的 CSV 文件中读取数据并将其保存在 BigQuery table 中。

这是我的 csv 文件:

1,Marc,B12,2017-03-24
2,Marc,B12,2018-01-31
3,Marc,B21,2017-03-17
4,Jeam,B12,2017-12-30
5,Jeam,B12,2017-09-02
6,Jeam,B11,2018-06-30
7,Jeam,B21,2018-03-02
8,Olivier,B20,2017-12-30

这是我的代码:

val spark = SparkSession
    .builder()
    .appName("Hyp-session-bq")
    .config("spark.master","local")
    .getOrCreate()
  val sc : SparkContext = spark.sparkContext


  val conf=sc.hadoopConfiguration

  //Input Parameters
  val projectId = conf.get("fs.gs.project.id")
  val bucket = conf.get("fs.gs.system.bucket")
  val inputTable = s"$projectId:rpc.testBig"

  //Input Configuration
  conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId)
  conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,bucket)
  BigQueryConfiguration.configureBigQueryInput(conf,inputTable)

  //Output Parameters
  val outPutTable = s"$projectId:rpc.outTestBig"

  // Temp output bucket that is deleted upon completion of job
  val outPutGcsPath = ("gs://"+bucket+"/hadoop/tmp/outTestBig")

  BigQueryOutputConfiguration.configure(conf,
    outPutTable,
    null,
    outPutGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

  conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)

  // Truncate the table before writing output to allow multiple runs.
  conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")

  val text_file = sc.textFile("gs://test_files/csvfiles/test.csv")
  val lignes = text_file.flatMap(x=>x.split(" "))
  case class schemaFile(id: Int, name: String, symbole: String, date: String)

  def parseStringWithCaseClass(str: String): schemaFile = schemaFile(
      val id = str.split(",")(0).toInt,
      val name = str.split(",")(1),
      val symbole = str.split(",")(2),
      val date = str.split(",")(3)
    )

    val result1 = lignes.map(x=>parseStringWithCaseClass(x))
    val x =result1.map(elem =>(null,new Gson().toJsonTree(elem)))
    val y = x.saveAsNewAPIHadoopDataset(conf)  

当我 运行 代码时,我得到这个错误:

ERROR org.apache.spark.internal.io.SparkHadoopMapReduceWriter: Aborting job job_20180226083501_0008.
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Load configuration must specify at least one source URI",
    "reason" : "invalid"
  } ],
  "message" : "Load configuration must specify at least one source URI"
}
        at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.interceptResponse(AbstractGoogleClientRequest.java:321)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.insertJobOrFetchDuplicate(BigQueryHelper.java:306)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:160)
        at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
        at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
        at jeam.BigQueryIO$.main(BigQueryIO.scala:115)
        at jeam.BigQueryIO.main(BigQueryIO.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  

我认为问题出在 case classparseStringWithCaseClass 但我不知道如何解决这个问题. 我在配置中没有问题,因为我在尝试使用 wordcount 示例时得到了完美的结果:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

尝试使用 Tuple4:

  def parseStringWithTuple(str: String): Tuple4[Int, String, String, String] = {
      val id = str.split(",")(0).toInt
      val name = str.split(",")(1)
      val symbole = str.split(",")(2)
      val date = str.split(",")(3)
      (id, name, symbole, date)
    }
val result1 = lignes.map(x=>parseStringWithTuple(x))

但是我测试了你的代码,它工作正常。

我一直在用我自己的 BigQuery 表和 CSV 文件对您的代码进行一些测试运行,它对我很有效,不需要任何额外的修改。

我看到当您按照@jean-marc 的建议将 CaseClass 更改为 Tuple4 时,您的代码开始工作,所以这是一个奇怪的行为,更重要的是考虑到他和我,您的代码实际上无需进一步修改即可正常工作。错误 Load configuration must specify at least one source URI 通常在 BigQuery 中的加载作业未正确配置且未接收到正确的 Cloud Storage 对象 URL 时出现。但是,如果相同的代码仅在更改为 Tuple4 时有效,并且您使用的 CSV 文件相同且未更改(即 URL 有效),则这可能是一个暂时性问题,可能与 Cloud Storage 或 BigQuery 有关,而不与 Dataproc 作业本身有关。

最后,鉴于此问题是您特有的(它已为至少两个使用相同代码的用户工作),一旦您检查没有与 Cloud Storage 对象(权限)相关的问题、错误的位置等),您可能有兴趣在 Public Issue Tracker.

中创建错误