从 JDBC 源迁移数据时如何优化分区?

How to optimize partitioning when migrating data from JDBC source?

我正在尝试将数据从 PostgreSQL table 中的 table 移动到 HDFS 上的 Hive table。为此,我想出了以下代码:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

数据插入到table根据prtn_String_columns: source_system_name, period_year, period_num

动态分区的hive

使用的 Spark 提交:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

执行程序日志中生成以下错误消息:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access0(URLClassLoader.java:73)
    at java.net.URLClassLoader.run(URLClassLoader.java:368)
    at java.net.URLClassLoader.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

我在日志中看到读取正在使用给定数量的分区正确执行,如下所示:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是各阶段执行者的状态:

数据未正确分区。一个分区变小,而另一个变大。这里有一个倾斜问题。 将数据插入 Hive table 时,作业在以下行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") 但我知道这是由于数据倾斜问题而发生的。

我尝试增加执行程序的数量,增加执行程序内存、驱动程序内存,尝试只保存为 csv 文件而不是将数据帧保存到 Hive table,但是给出异常不会影响执行:

java.lang.OutOfMemoryError: GC overhead limit exceeded

代码中有什么我需要更正的吗?谁能告诉我如何解决这个问题?

  1. 根据输入数据量和集群资源确定需要多少个分区。根据经验,除非绝对必要,否则最好将分区输入保持在 1GB 以下。并且严格小于块大小限制。

    您已经 previously stated 迁移了在不同帖子中使用的 1TB 数据值 (5 - 70) 可能会降低以确保顺利进行。

    尝试使用不需要进一步 repartitioning.

  2. 的值
  3. 了解您的数据。

    分析数据集中可用的列以确定是否有任何具有高基数和均匀分布的列分布在所需数量的分区中。这些是导入过程的良好候选者。此外,您应该确定准确的值范围。

    具有不同中心性和偏度度量的聚合以及直方图和基本按键计数是很好的探索工具。对于这部分,最好直接在数据库中分析数据,而不是将其提取到 Spark。

    根据 RDBMS,您可以使用 width_bucket(PostgreSQL、Oracle)或等效函数来了解在使用 partitionColumnlowerBound, upperBound, numPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
    
  4. 如果没有满足上述条件的列考虑:

    • 创建一个自定义的并通过公开它。一个看法。多个独立列的散列通常是很好的选择。请查阅您的数据库手册以确定可在此处使用的函数(DBMS_CRYPTO 在 Oracle 中,pgcrypto 在 PostgreSQL 中)*.
    • 使用一组独立的列,这些列一起提供足够高的基数。

      可选地,如果您要写入分区的 Hive table,您应该考虑包括 Hive 分区列。它可能会限制以后生成的文件数量。

  5. 准备分区参数

    • 如果在前面的步骤中选择或创建的列是数字 (or date / timestamp in Spark >= 2.4) 直接提供它作为 partitionColumn 并使用之前确定的范围值来填充 lowerBoundupperBound.

      如果绑定值不反映数据的属性(min(col) 代表 lowerBoundmax(col) 代表 upperBound),它可能会导致严重的数据倾斜,因此仔细穿线。在最坏的情况下,当边界不覆盖数据范围时,所有记录将由一台机器获取,这并不比完全不分区好。

    • 如果在前面的步骤中选择的列是分类的或者是一组列,则生成一个完全覆盖数据的互斥谓词列表,在可以在 SQL where 子句中使用的形式。

      例如,如果您的列 A 的值为 {a1a2a3},列 B 的值为 {b1, b2, b3}:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"
      

      仔细检查条件是否重叠并且所有组合都已涵盖。如果不满足这些条件,您最终会分别得到重复或缺失的记录。

      将数据作为 predicates 参数传递给 jdbc 调用。请注意,分区数将正好等于谓词数。

  6. 将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致。如果可能,您应该在开始整个过程​​之前锁定数据库,但如果可能的话,在您的组织中).

  7. 如果分区数量与没有repartition所需的输出负载数据匹配,则直接转储到sink,如果不是,您可以尝试按照与步骤1中相同的规则重新分区.

  8. 如果您仍然遇到任何问题,请确保您已正确配置 Spark 内存和 GC 选项。

  9. 如果none以上作品:

    • 考虑将您的数据转储到网络/使用 COPY TO 等工具分配存储并直接从那里读取。

      请注意,您通常需要一个 POSIX 兼容的文件系统或标准数据库实用程序,因此 HDFS 通常不需要。

      这种方式的好处是不用担心列属性,也不需要将数据置于只读模式,保证一致性。

    • 使用专用的批量传输工具,如 Apache Sqoop,然后重塑数据。


* 不要 使用伪列 - Pseudocolumn in Spark JDBC.

你的另一个问题被发送到这里是重复的

 'How to avoid data skewing while reading huge datasets or tables into spark? 
  The data is not being partitioned properly. One partition is smaller while the 
  other one becomes huge on read.
  I observed that one of the partition has nearly 2million rows and 
  while inserting there is a skew in partition. '

如果问题是处理读取后在数据框中分区的数据,您是否尝试过增加 "numPartitions" 值?

.option("numPartitions",50)

lowerBound, upperBound 为生成的 WHERE 子句表达式形成分区步长,numpartitions 决定拆分的数量。

例如,sometable 有列 - ID(我们选择它作为 partitionColumn);我们在 table 中看到的 column-ID 的值范围是从 1 到 1000,我们希望通过 运行 select * from sometable 获取所有记录, 所以我们使用 lowerbound = 1 & upperbound = 1000 和 numpartition = 4

这将根据我们的 feed (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

构建 sql 生成包含每个查询结果的 4 分区数据框
select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750

如果我们 table 中的大部分记录都在 ID(500,750) 范围内怎么办?这就是你所处的情况。

当我们增加 numpartition 时,拆分会进一步发生,从而减少同一分区中的记录量,但这 不是一个好镜头。

与其根据我们提供的边界对 partitioncolumn 进行火花拆分,如果您考虑自己进行拆分,数据可以均匀分布 分裂。你需要切换到另一个 JDBC 方法,我们可以提供而不是 (lowerbound,upperbound & numpartition) 直接谓词。

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 

Link

根据我的经验,有 4 种不同的内存设置:

A) [1] 处理原因存储数据的内存 VS [2] 存放程序栈的堆 Space

B) [1] Driver VS [2] executor memory

到目前为止,我总是能够通过增加适当类型的内存来成功获得我的 Spark 作业 运行:

因此,A2-B1 将是驱动程序上可用于保存程序堆栈的内存。等等

属性名如下:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

请记住,所有 *-B1 的总和必须小于您的工作程序上的可用内存,所有 *-B2 的总和必须小于您的驱动程序节点上的内存。

我敢打赌,罪魁祸首是粗体标记的堆设置之一。