Spark Streaming 中的 groupby 理想策略

groupby ideal strategy in Spark Streaming

我正在使用 Kafka 源中的 Spark Streaming 读取数据,我从那里创建了一个包含列 wsidyearmonthdayoneHourPrecip:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")

然后我执行 groupby,然后尝试使用 JDBC 将此流数据写入 table。为此,这是我的代码:

val query= df.writeStream
    .outputMode(OutputMode.Append())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.groupBy($"wsid" , $"year" , $"month" , $"day")
            .agg(sum($"oneHourPrecip").as("precipitation"))
            .write
            .mode(SaveMode.Append)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

问题出在批次上。使用 Spark Streaming,我们无法预测数据帧中每个批次的行数。所以很多时候,我得到的数据是脱节的(即对于给定的公共值 (wsid,year,month,day),一些行出现在一个批次中,而另一些行出现在另一批次中)。

然后当我分组并尝试使用 JDBC 编写它时,这是我得到的错误:

com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][4.25.13] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.b6.a(b6.java:502)
    at com.ibm.db2.jcc.am.Agent.endBatchedReadChain(Agent.java:434)
    at com.ibm.db2.jcc.am.k4.a(k4.java:5452)
    at com.ibm.db2.jcc.am.k4.c(k4.java:5026)
    at com.ibm.db2.jcc.am.k4.executeBatch(k4.java:3058)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException: Error for batch element #1: DB2 SQL Error: SQLCODE=-803, SQLSTATE=23505, SQLERRMC=1;SPARK.DAILY_PRECIPITATION_DATA, DRIVER=4.25.13
        at com.ibm.db2.jcc.am.b6.a(b6.java:806)
        at com.ibm.db2.jcc.am.b6.a(b6.java:66)
        at com.ibm.db2.jcc.am.b6.a(b6.java:140)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:1283)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:128)
        at com.ibm.db2.jcc.t4.p.a(p.java:57)
        at com.ibm.db2.jcc.t4.aw.a(aw.java:225)
        at com.ibm.db2.jcc.am.k4.a(k4.java:3605)
        at com.ibm.db2.jcc.am.k4.d(k4.java:6020)
        at com.ibm.db2.jcc.am.k4.a(k4.java:5372)
        ... 17 more

从上面的SqlIntegrityConstraintViolationException可以明显看出,这是因为在一批使用JDBC写入groupbyed值后,下一组值的插入失败,因为主键 (wsid,year,month,day).

鉴于源中给定的 (wsid,year,month,day) 将有固定数量的 oneHourPrecip 值 (24),我们如何确保 groupBy 对从中流式传输的所有数据正常工作源,所以插入数据库不是问题?

SaveMode.Upsert 不可用:-) 与groupBy无关。 group by 只是对值进行分组。完整性违规(com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException)你需要在 sql 级别小心。

选项 1:

您可以执行插入更新以避免违反完整性。

为此,您需要使用如下伪代码...

dataframe.foreachPartition {

update TABLE_NAME set FIELD_NAME=xxxxx where MyID=XXX;

INSERT INTO TABLE_NAME values (colid,col1,col2) 
WHERE NOT EXISTS(select 1 from TABLE_NAME where colid=xxxx);
}

选项 2: 或者检查 db2

中的 merge statement

一种方法是创建一个具有相同模式的空临时 table(没有任何约束)并填充它,最后您可以执行一个脚本,该脚本将合并到目标中 table.

我确实想出了一些办法,但这可能会带来一些性能问题。无论如何,它对我有用,所以我发布了答案:

我发现,为了将 groupbyed 数据存储到 DB2 table,我们必须等到从源中检索到所有数据。为此,我使用 OutputMode.Complete()

然后我意识到如果我在当前方法中分组后将其写入DB2,它仍然会抛出同样的错误。为此,我不得不在 foreachBatch.

中使用 SaveMode.Overwrite

我用这种方法尝试了 运行 我的程序,但它抛出了这个错误:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets

所以我决定在 readStream 期间进行分组和聚合。因此我的代码看起来像这样:

readStream部分:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")
    .groupBy($"wsid" , $"year" , $"month" , $"day")
    .agg(sum($"oneHourPrecip").as("precipitation"))

writeStream部分:

val query= df.writeStream
    .outputMode(OutputMode.Complete())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.write
            .mode(SaveMode.Overwrite)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

query.awaitTermination()