Spark Streaming 中的 groupby 理想策略
groupby ideal strategy in Spark Streaming
我正在使用 Kafka 源中的 Spark Streaming 读取数据,我从那里创建了一个包含列 wsid
、year
、month
、day
、oneHourPrecip
:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
.selectExpr("CAST(value as STRING)")
.as[String]
.withColumn("_tmp", split(col("value"), "\,"))
.select(
$"_tmp".getItem(0).as("wsid"),
$"_tmp".getItem(1).as("year").cast("int"),
$"_tmp".getItem(2).as("month").cast("int"),
$"_tmp".getItem(3).as("day").cast("int"),
$"_tmp".getItem(11).as("oneHourPrecip").cast("double")
)
.drop("_tmp")
然后我执行 groupby,然后尝试使用 JDBC 将此流数据写入 table。为此,这是我的代码:
val query= df.writeStream
.outputMode(OutputMode.Append())
.foreachBatch((df: DataFrame , id: Long) => {
println(df.count())
df.groupBy($"wsid" , $"year" , $"month" , $"day")
.agg(sum($"oneHourPrecip").as("precipitation"))
.write
.mode(SaveMode.Append)
.jdbc(url , s"$schema.$table" , getProperties)
})
.trigger(Trigger.ProcessingTime(1))
.start()
问题出在批次上。使用 Spark Streaming,我们无法预测数据帧中每个批次的行数。所以很多时候,我得到的数据是脱节的(即对于给定的公共值 (wsid,year,month,day)
,一些行出现在一个批次中,而另一些行出现在另一批次中)。
然后当我分组并尝试使用 JDBC 编写它时,这是我得到的错误:
com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][4.25.13] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.b6.a(b6.java:502)
at com.ibm.db2.jcc.am.Agent.endBatchedReadChain(Agent.java:434)
at com.ibm.db2.jcc.am.k4.a(k4.java:5452)
at com.ibm.db2.jcc.am.k4.c(k4.java:5026)
at com.ibm.db2.jcc.am.k4.executeBatch(k4.java:3058)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException: Error for batch element #1: DB2 SQL Error: SQLCODE=-803, SQLSTATE=23505, SQLERRMC=1;SPARK.DAILY_PRECIPITATION_DATA, DRIVER=4.25.13
at com.ibm.db2.jcc.am.b6.a(b6.java:806)
at com.ibm.db2.jcc.am.b6.a(b6.java:66)
at com.ibm.db2.jcc.am.b6.a(b6.java:140)
at com.ibm.db2.jcc.t4.ab.a(ab.java:1283)
at com.ibm.db2.jcc.t4.ab.a(ab.java:128)
at com.ibm.db2.jcc.t4.p.a(p.java:57)
at com.ibm.db2.jcc.t4.aw.a(aw.java:225)
at com.ibm.db2.jcc.am.k4.a(k4.java:3605)
at com.ibm.db2.jcc.am.k4.d(k4.java:6020)
at com.ibm.db2.jcc.am.k4.a(k4.java:5372)
... 17 more
从上面的SqlIntegrityConstraintViolationException
可以明显看出,这是因为在一批使用JDBC写入groupby
ed值后,下一组值的插入失败,因为主键 (wsid,year,month,day)
.
鉴于源中给定的 (wsid,year,month,day)
将有固定数量的 oneHourPrecip
值 (24),我们如何确保 groupBy 对从中流式传输的所有数据正常工作源,所以插入数据库不是问题?
SaveMode.Upsert
不可用:-)
与groupBy
无关。 group by 只是对值进行分组。完整性违规(com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException
)你需要在 sql 级别小心。
选项 1:
您可以执行插入更新以避免违反完整性。
为此,您需要使用如下伪代码...
dataframe.foreachPartition {
update TABLE_NAME set FIELD_NAME=xxxxx where MyID=XXX;
INSERT INTO TABLE_NAME values (colid,col1,col2)
WHERE NOT EXISTS(select 1 from TABLE_NAME where colid=xxxx);
}
选项 2:
或者检查 db2
中的 merge statement
一种方法是创建一个具有相同模式的空临时 table(没有任何约束)并填充它,最后您可以执行一个脚本,该脚本将合并到目标中
table.
我确实想出了一些办法,但这可能会带来一些性能问题。无论如何,它对我有用,所以我发布了答案:
我发现,为了将 groupby
ed 数据存储到 DB2 table,我们必须等到从源中检索到所有数据。为此,我使用 OutputMode.Complete()
。
然后我意识到如果我在当前方法中分组后将其写入DB2,它仍然会抛出同样的错误。为此,我不得不在 foreachBatch
.
中使用 SaveMode.Overwrite
我用这种方法尝试了 运行 我的程序,但它抛出了这个错误:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets
所以我决定在 readStream
期间进行分组和聚合。因此我的代码看起来像这样:
readStream
部分:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
.selectExpr("CAST(value as STRING)")
.as[String]
.withColumn("_tmp", split(col("value"), "\,"))
.select(
$"_tmp".getItem(0).as("wsid"),
$"_tmp".getItem(1).as("year").cast("int"),
$"_tmp".getItem(2).as("month").cast("int"),
$"_tmp".getItem(3).as("day").cast("int"),
$"_tmp".getItem(11).as("oneHourPrecip").cast("double")
)
.drop("_tmp")
.groupBy($"wsid" , $"year" , $"month" , $"day")
.agg(sum($"oneHourPrecip").as("precipitation"))
writeStream
部分:
val query= df.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch((df: DataFrame , id: Long) => {
println(df.count())
df.write
.mode(SaveMode.Overwrite)
.jdbc(url , s"$schema.$table" , getProperties)
})
.trigger(Trigger.ProcessingTime(1))
.start()
query.awaitTermination()
我正在使用 Kafka 源中的 Spark Streaming 读取数据,我从那里创建了一个包含列 wsid
、year
、month
、day
、oneHourPrecip
:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
.selectExpr("CAST(value as STRING)")
.as[String]
.withColumn("_tmp", split(col("value"), "\,"))
.select(
$"_tmp".getItem(0).as("wsid"),
$"_tmp".getItem(1).as("year").cast("int"),
$"_tmp".getItem(2).as("month").cast("int"),
$"_tmp".getItem(3).as("day").cast("int"),
$"_tmp".getItem(11).as("oneHourPrecip").cast("double")
)
.drop("_tmp")
然后我执行 groupby,然后尝试使用 JDBC 将此流数据写入 table。为此,这是我的代码:
val query= df.writeStream
.outputMode(OutputMode.Append())
.foreachBatch((df: DataFrame , id: Long) => {
println(df.count())
df.groupBy($"wsid" , $"year" , $"month" , $"day")
.agg(sum($"oneHourPrecip").as("precipitation"))
.write
.mode(SaveMode.Append)
.jdbc(url , s"$schema.$table" , getProperties)
})
.trigger(Trigger.ProcessingTime(1))
.start()
问题出在批次上。使用 Spark Streaming,我们无法预测数据帧中每个批次的行数。所以很多时候,我得到的数据是脱节的(即对于给定的公共值 (wsid,year,month,day)
,一些行出现在一个批次中,而另一些行出现在另一批次中)。
然后当我分组并尝试使用 JDBC 编写它时,这是我得到的错误:
com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][4.25.13] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.b6.a(b6.java:502)
at com.ibm.db2.jcc.am.Agent.endBatchedReadChain(Agent.java:434)
at com.ibm.db2.jcc.am.k4.a(k4.java:5452)
at com.ibm.db2.jcc.am.k4.c(k4.java:5026)
at com.ibm.db2.jcc.am.k4.executeBatch(k4.java:3058)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException: Error for batch element #1: DB2 SQL Error: SQLCODE=-803, SQLSTATE=23505, SQLERRMC=1;SPARK.DAILY_PRECIPITATION_DATA, DRIVER=4.25.13
at com.ibm.db2.jcc.am.b6.a(b6.java:806)
at com.ibm.db2.jcc.am.b6.a(b6.java:66)
at com.ibm.db2.jcc.am.b6.a(b6.java:140)
at com.ibm.db2.jcc.t4.ab.a(ab.java:1283)
at com.ibm.db2.jcc.t4.ab.a(ab.java:128)
at com.ibm.db2.jcc.t4.p.a(p.java:57)
at com.ibm.db2.jcc.t4.aw.a(aw.java:225)
at com.ibm.db2.jcc.am.k4.a(k4.java:3605)
at com.ibm.db2.jcc.am.k4.d(k4.java:6020)
at com.ibm.db2.jcc.am.k4.a(k4.java:5372)
... 17 more
从上面的SqlIntegrityConstraintViolationException
可以明显看出,这是因为在一批使用JDBC写入groupby
ed值后,下一组值的插入失败,因为主键 (wsid,year,month,day)
.
鉴于源中给定的 (wsid,year,month,day)
将有固定数量的 oneHourPrecip
值 (24),我们如何确保 groupBy 对从中流式传输的所有数据正常工作源,所以插入数据库不是问题?
SaveMode.Upsert
不可用:-)
与groupBy
无关。 group by 只是对值进行分组。完整性违规(com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException
)你需要在 sql 级别小心。
选项 1:
您可以执行插入更新以避免违反完整性。
为此,您需要使用如下伪代码...
dataframe.foreachPartition {
update TABLE_NAME set FIELD_NAME=xxxxx where MyID=XXX;
INSERT INTO TABLE_NAME values (colid,col1,col2)
WHERE NOT EXISTS(select 1 from TABLE_NAME where colid=xxxx);
}
选项 2: 或者检查 db2
中的 merge statement一种方法是创建一个具有相同模式的空临时 table(没有任何约束)并填充它,最后您可以执行一个脚本,该脚本将合并到目标中 table.
我确实想出了一些办法,但这可能会带来一些性能问题。无论如何,它对我有用,所以我发布了答案:
我发现,为了将 groupby
ed 数据存储到 DB2 table,我们必须等到从源中检索到所有数据。为此,我使用 OutputMode.Complete()
。
然后我意识到如果我在当前方法中分组后将其写入DB2,它仍然会抛出同样的错误。为此,我不得不在 foreachBatch
.
SaveMode.Overwrite
我用这种方法尝试了 运行 我的程序,但它抛出了这个错误:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets
所以我决定在 readStream
期间进行分组和聚合。因此我的代码看起来像这样:
readStream
部分:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
.selectExpr("CAST(value as STRING)")
.as[String]
.withColumn("_tmp", split(col("value"), "\,"))
.select(
$"_tmp".getItem(0).as("wsid"),
$"_tmp".getItem(1).as("year").cast("int"),
$"_tmp".getItem(2).as("month").cast("int"),
$"_tmp".getItem(3).as("day").cast("int"),
$"_tmp".getItem(11).as("oneHourPrecip").cast("double")
)
.drop("_tmp")
.groupBy($"wsid" , $"year" , $"month" , $"day")
.agg(sum($"oneHourPrecip").as("precipitation"))
writeStream
部分:
val query= df.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch((df: DataFrame , id: Long) => {
println(df.count())
df.write
.mode(SaveMode.Overwrite)
.jdbc(url , s"$schema.$table" , getProperties)
})
.trigger(Trigger.ProcessingTime(1))
.start()
query.awaitTermination()