从 Spark 读取时分区 sql table 数据出现问题

Issue with partioning sql table data when reading from Spark

我编写了一个 Scala 程序,用于从 MS SQL 服务器加载数据并将其写入 BigQuery。我在 Spark 集群 (Google Dataproc) 中执行此操作。我的问题是,即使我有一个具有 64 个内核的集群,并且在 运行 作业时指定了执行程序参数,并且我对正在读取的数据进行了分区,但 Spark 仅从单个执行程序读取数据。当我开始工作时,我可以看到所有执行程序都在启动,在 SQL 服务器上我可以看到所有 4 个工作人员的连接,但在一分钟内,他们都再次关闭,只留下一个,然后 运行s 一个多小时才结束。

数据集是6500万条记录,我想把它分成60个分区。

这是我的集群:

    gcloud dataproc clusters create my-cluster \
  --properties dataproc:dataproc.conscrypt.provider.enable=false,spark:spark.executor.userClassPathFirst=true,spark:spark.driver.userClassPathFirst=true \
  --region europe-north1 \
  --subnet my-subnet \
  --master-machine-type n1-standard-4 \
  --worker-machine-type n1-highmem-16 \
  --master-boot-disk-size 15GB \
  --worker-boot-disk-size 500GB \
  --image-version 1.4 \
  --master-boot-disk-type=pd-ssd \
  --worker-boot-disk-type=pd-ssd \
  --num-worker-local-ssds=1 \
  --num-workers=4

这就是我 运行 工作的方式:

    gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g, \
spark.executor.cores=4, \
spark.executor.instances=11 \
-- yarn

这是我用来读取数据的代码:

val data = sqlQuery(ss,
                    serverName,
                    portNumber,
                    databaseName,
                    userName,
                    password,
                    tableName)

writeToBigQuery(
      bqConfig,
      data,
      dataSetName,
      replaceInvalidCharactersInTableName(r.getAs[String]("TableName")),
      "WRITE_TRUNCATE")

def sqlQuery(ss: SparkSession,
             hostName: String,
             port: String,
             databaseName: String,
             user: String,
             password: String,
             query: String): DataFrame = {
  val result = ss.read.format("jdbc")
    .option("url", getJdbcUrl(hostName, port, databaseName))
    .option("dbtable", query)
    .option("user", user)
    .option("password", password)
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("numPartitions", 60)
    .option("partitionColumn", "entityid")
    .option("lowerBound", 1)
    .option("upperBound", 198012).load()

  result
}

def writeToBigQuery(bqConf: Configuration,
                    df: DataFrame,
                    dataset: String,
                    table: String,
                    writeDisposition: String = "WRITE_APPEND"): Unit = {

  //Convert illegal characters in column names
  var legalColumnNamesDf = df
  for (col <- df.columns) {
    legalColumnNamesDf = legalColumnNamesDf.withColumnRenamed(
      col,
      col
        .replaceAll("-", "_")
        .replaceAll("\s", "_")
        .replaceAll("æ", "ae")
        .replaceAll("ø", "oe")
        .replaceAll("å", "aa")
        .replaceAll("Æ", "AE")
        .replaceAll("Ø", "OE")
        .replaceAll("Å", "AA")
    )
  }

  val outputGcsPath = s"gs://$bucket/" + HardcodedValues.SparkTempFolderRelativePath + UUID
    .randomUUID()
    .toString
  val outputTableId = s"$projectId:$dataset.$table"

  //Apply explicit schema since to avoid creativity of BigQuery auto config
  val uniqBqConf = new Configuration(bqConf)

  BigQueryOutputConfiguration.configure(
    uniqBqConf,
    outputTableId,
    s"""{"fields":${Json(DefaultFormats).write(
      legalColumnNamesDf.schema.map(
        f =>
          Map(
            "name" -> f.name,
            "type" -> f.dataType.sql
              .replace("BIGINT", "INT")
              .replace("INT", "INT64")
              .replaceAll("DECIMAL\(\d+,\d+\)", "NUMERIC"),
            "mode" -> (if (f.nullable) "NULLABLE"
                       else "REQUIRED")
        ))
    )} }""",
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_, _]]
  )

  uniqBqConf.set(
    BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
    if (Array("WRITE_APPEND", "WRITE_TRUNCATE") contains writeDisposition)
      writeDisposition
    else "WRITE_APPEND"
  )

  //Save to BigQuery
  legalColumnNamesDf.rdd
    .map(
      row =>
        (null,
         Json(DefaultFormats).write(
           ListMap(row.schema.fieldNames.toSeq.zip(row.toSeq): _*))))
    .saveAsNewAPIHadoopDataset(uniqBqConf)

}

如有任何想法,我们将不胜感激。

如果您查看 Spark UI,是否存在一个任务读取大部分数据的严重偏差?我的猜测是您选择了一个糟糕的分区键,所以大部分数据最终都在一个分区中。

这个 Whosebug 答案提供了详细的解释:。我认为您的实体 ID 需要在 1 到 198012 之间均匀分布,才能成为一个很好的分区列。

最后我试着停下来告诉 spark 有多少执行者到 运行 并且只做动态分配,现在它起作用了。我要求 24 个分区,它动态分配 8 个执行器,每个执行器有 3 个内核,运行并行执行 24 个任务。