如何在 Spark 结构化流中将两个流 df 写入 MySQL 中的两个不同表?

How to write two streaming df's into two different tables in MySQL in Spark sturctured streaming?

我使用的是spark 2.3.2版本

我在 spark 结构化流中编写了代码,将流式数据帧数据插入到两个不同的 MySQL table 中。

假设有两个流 df:DF1、DF2。

我已经使用 foreachWriter API 编写了两个查询 (query1,query2),分别从不同的流写入 MySQL tables。 IE。 DF1 进入 MYSQLtable A 和 DF2 进入 MYSQL table B.

当我 运行 spark 作业时,首先是 运行s query1,然后是 query2,所以它写入 table A 但不写入 table B。

如果我先将代码更改为 运行 query2,然后再更改 query1,它会写入 table B 但不会写入 table A。

所以我知道它正在执行第一个查询只是为了写入 table。

注意:我试过分别给两个table不同的MySQL user/database。但是运气不好。

谁能指点一下?如何让它发挥作用。

我的代码如下:

import java.sql._

class  JDBCSink1(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableA(col1,col2,col3) VALUES(?,?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.setString (3, value(2).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



class  JDBCSink2(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableB(col1,col2) VALUES(?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



val url1="jdbc:mysql://hostname:3306/db1"
val url2="jdbc:mysql://hostname:3306/db2"

val user1 ="usr1"
val user2="usr2"
val pwd = "password"

val Writer1 = new JDBCSink1(url1,user1, pwd)

val Writer2 = new JDBCSink2(url2,user2, pwd)


val query2 =
  streamDF2
    .writeStream
    .foreach(Writer2)
    .outputMode("append")
    .trigger(ProcessingTime("35 seconds"))
    .start().awaitTermination()



val query1 =
  streamDF1
    .writeStream
    .foreach(Writer1)
    .outputMode("append")
    .trigger(ProcessingTime("30 seconds"))
    .start().awaitTermination()

由于 awaitTermination,您阻止了第二个查询。如果你想有两个输出流,你需要在等待它们终止之前启动它们:

val query2 =
  streamDF2
    .writeStream
    .foreach(Writer2)
    .outputMode("append")
    .trigger(ProcessingTime("35 seconds"))
    .start()

val query1 =
  streamDF1
    .writeStream
    .foreach(Writer1)
    .outputMode("append")
    .trigger(ProcessingTime("30 seconds"))
    .start()

query1.awaitTermination()
query2.awaitTermination()

编辑:

Spark 还允许您为 Scheduling within an application 中所述的不同流式查询安排和分配资源。您可以根据

配置您的池
  • 调度模式:可以是FIFOFAIR
  • weight: "这控制了池相对于其他池在集群中的份额。默认情况下,所有池的权重都是 1。如果你给特定的池一个权重2,例如,它将获得比其他活动池多 2 倍的资源。"
  • minShare: "除了总权重外,每个池都可以被赋予管理员希望的最小份额(CPU 核心数)有。

可以通过创建一个 XML 文件来设置池配置,类似于 conf/fairscheduler.xml.template,然后将名为 fairscheduler.xml 的文件放在类路径中,或者设置 spark.scheduler.allocation.file 属性 在您的 SparkConf 中。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

应用不同的池可以像下面这样完成:

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")

// In the above example you could then tell Spark to make use of the pools
val query1 = streamDF1.writeStream.[...].start(pool1)
val query2 = streamDF2.writeStream.[...].start(pool2)