是否可以使用 Spark Structured Streaming 中的 foreachBatch 将两个不相交的数据集写入数据同步?
Is it possible to write two disjoint Datasets to data sync using foreachBatch in Spark Structured Streaming?
我正在尝试将数据从单一来源写入多个数据接收器(Mongo 和 Postgres 数据库)。
传入数据
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
问题是,我可以看到 Spark 打开两个流并读取相同的事件两次。
是否可以读取一次并应用不同的转换并写入不同的集合?
您应该缓存 DataFrame。
见 here:
Write to multiple locations - If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.
他们的榜样:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
您可以将所有代码放在一个 foreachBatch
中,然后将数据帧写入您的 2 个接收器。您可以通过缓存数据帧,并在此缓存的数据帧上执行 selectExpr
并保存它来做到这一点。
作为旁注 - 请注意,在任何情况下,如果您想要“全有或全无”(即您不希望写信给 mongo 而不是 postgres),你必须只使用一个 foreachBatch
,因为否则(如果你有 2 foreachBatch
,如你的问题)你有 2 个独立的批次 - 一个可能会失败,而另一个对于相同的数据,一个成功了。
最后我可以使用 Spark 3.1.1 和 Structured Streaming Table API 解决这个问题。
Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:
// Stream to myTable
df.writeStream()
.option("checkpointLocation", "/tmp/test")
.toTable("myTable");
// Stream from myTable
Dataset<Row> tableDf = spark.readStream()
.table("myTable");
有了这个,我可以解决从源“kafka”多次读取的问题。有了这个,它将只创建一个 Kafka 消费者并将数据流式传输到 Table。从那里它将使用多个流从 table 读取并在 table 数据集之上应用额外的转换。
所以完整的示例如下所示:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
// Stream to myTable
df.writeStream.option("checkpointLocation", "c:/temp/test").toTable("myTable");
// Stream from myTable
Dataset<Row> tableDf = spark.readStream().table("myTable");
Dataset<Row> personalDetails = tableDf.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = tableDf.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
我正在尝试将数据从单一来源写入多个数据接收器(Mongo 和 Postgres 数据库)。 传入数据
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
问题是,我可以看到 Spark 打开两个流并读取相同的事件两次。 是否可以读取一次并应用不同的转换并写入不同的集合?
您应该缓存 DataFrame。 见 here:
Write to multiple locations - If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.
他们的榜样:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
您可以将所有代码放在一个 foreachBatch
中,然后将数据帧写入您的 2 个接收器。您可以通过缓存数据帧,并在此缓存的数据帧上执行 selectExpr
并保存它来做到这一点。
作为旁注 - 请注意,在任何情况下,如果您想要“全有或全无”(即您不希望写信给 mongo 而不是 postgres),你必须只使用一个 foreachBatch
,因为否则(如果你有 2 foreachBatch
,如你的问题)你有 2 个独立的批次 - 一个可能会失败,而另一个对于相同的数据,一个成功了。
最后我可以使用 Spark 3.1.1 和 Structured Streaming Table API 解决这个问题。
Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:
// Stream to myTable
df.writeStream()
.option("checkpointLocation", "/tmp/test")
.toTable("myTable");
// Stream from myTable
Dataset<Row> tableDf = spark.readStream()
.table("myTable");
有了这个,我可以解决从源“kafka”多次读取的问题。有了这个,它将只创建一个 Kafka 消费者并将数据流式传输到 Table。从那里它将使用多个流从 table 读取并在 table 数据集之上应用额外的转换。 所以完整的示例如下所示:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
// Stream to myTable
df.writeStream.option("checkpointLocation", "c:/temp/test").toTable("myTable");
// Stream from myTable
Dataset<Row> tableDf = spark.readStream().table("myTable");
Dataset<Row> personalDetails = tableDf.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = tableDf.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();