如何使用 PySpark 中的 foreach 或 foreachBatch 写入数据库?

How to use foreach or foreachBatch in PySpark to write to database?

我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 进行 Spark Structured Streaming (Spark 2.4.x)。

我想使用流式 Spark 数据帧而不是静态数据帧或 Pandas 数据帧。

似乎必须使用 foreachforeachBatch,因为根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

,流数据帧没有可能的数据库接收器

这是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

我收到一个错误:

AttributeError: write

为什么?

tl;drforeach 替换为 foreachBatch


引用 official documentation:

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

换句话说,您的 writeStream.foreach(process_row) 作用于没有 write.jdbc 可用的单行(数据),因此出现错误。

将行视为一段数据,您可以使用任何 API 将其保存在任何地方。

如果您真的需要 Spark 的支持(并且确实使用 write.jdbc),您实际上应该使用 foreachBatch.

while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

在 Jacek 的支持下,我可以修复我的示例:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

您还必须将 epoch_id 放入函数参数中。否则,您会在 jupyter notebook 中未显示的 spark 日志文件中收到错误。