是否可以使用 Spark Structured Streaming 中的 foreachBatch 将两个不相交的数据集写入数据同步?

Is it possible to write two disjoint Datasets to data sync using foreachBatch in Spark Structured Streaming?

我正在尝试将数据从单一来源写入多个数据接收器(Mongo 和 Postgres 数据库)。 传入数据

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();

问题是,我可以看到 Spark 打开两个流并读取相同的事件两次。 是否可以读取一次并应用不同的转换并写入不同的集合?

您应该缓存 DataFrame。 见 here:

Write to multiple locations - If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.

他们的榜样:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

您可以将所有代码放在一个 foreachBatch 中,然后将数据帧写入您的 2 个接收器。您可以通过缓存数据帧,并在此缓存的数据帧上执行 selectExpr 并保存它来做到这一点。

作为旁注 - 请注意,在任何情况下,如果您想要“全有或全无”(即您不希望写信给 mongo 而不是 postgres),你必须只使用一个 foreachBatch,因为否则(如果你有 2 foreachBatch,如你的问题)你有 2 个独立的批次 - 一个可能会失败,而另一个对于相同的数据,一个成功了。

最后我可以使用 Spark 3.1.1Structured Streaming Table API 解决这个问题。

Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:

    // Stream to myTable
    df.writeStream()
      .option("checkpointLocation", "/tmp/test")
      .toTable("myTable");
    // Stream from myTable
    Dataset<Row> tableDf = spark.readStream()
                                .table("myTable");

参考:Spark 3.1 Table API

有了这个,我可以解决从源“kafka”多次读取的问题。有了这个,它将只创建一个 Kafka 消费者并将数据流式传输到 Table。从那里它将使用多个流从 table 读取并在 table 数据集之上应用额外的转换。 所以完整的示例如下所示:

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

// Stream to myTable
df.writeStream.option("checkpointLocation", "c:/temp/test").toTable("myTable");

// Stream from myTable
Dataset<Row> tableDf = spark.readStream().table("myTable");

Dataset<Row> personalDetails = tableDf.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = tableDf.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();