如何使用 PySpark 中的 foreach 或 foreachBatch 写入数据库?
How to use foreach or foreachBatch in PySpark to write to database?
我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 进行 Spark Structured Streaming (Spark 2.4.x)。
我想使用流式 Spark 数据帧而不是静态数据帧或 Pandas 数据帧。
似乎必须使用 foreach
或 foreachBatch
,因为根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.
,流数据帧没有可能的数据库接收器
这是我的尝试:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])
# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()
# create DataFrame representing the stream
df = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mytopic") \
.load() \
.selectExpr("Timestamp", "cast (value as string) as json") \
.select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
.selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)
def process_row(row):
# Process row
row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreach(process_row).start()
我收到一个错误:
AttributeError: write
为什么?
tl;dr 将 foreach
替换为 foreachBatch
。
The foreach
and foreachBatch
operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach
allows custom write logic on every row, foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
换句话说,您的 writeStream.foreach(process_row)
作用于没有 write.jdbc
可用的单行(数据),因此出现错误。
将行视为一段数据,您可以使用任何 API 将其保存在任何地方。
如果您真的需要 Spark 的支持(并且确实使用 write.jdbc
),您实际上应该使用 foreachBatch
.
while foreach
allows custom write logic on every row, foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
在 Jacek 的支持下,我可以修复我的示例:
def process_row(df, epoch_id):
df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreachBatch(process_row).start()
您还必须将 epoch_id 放入函数参数中。否则,您会在 jupyter notebook 中未显示的 spark 日志文件中收到错误。
我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 进行 Spark Structured Streaming (Spark 2.4.x)。
我想使用流式 Spark 数据帧而不是静态数据帧或 Pandas 数据帧。
似乎必须使用 foreach
或 foreachBatch
,因为根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.
这是我的尝试:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])
# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()
# create DataFrame representing the stream
df = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mytopic") \
.load() \
.selectExpr("Timestamp", "cast (value as string) as json") \
.select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
.selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)
def process_row(row):
# Process row
row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreach(process_row).start()
我收到一个错误:
AttributeError: write
为什么?
tl;dr 将 foreach
替换为 foreachBatch
。
The
foreach
andforeachBatch
operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - whileforeach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
换句话说,您的 writeStream.foreach(process_row)
作用于没有 write.jdbc
可用的单行(数据),因此出现错误。
将行视为一段数据,您可以使用任何 API 将其保存在任何地方。
如果您真的需要 Spark 的支持(并且确实使用 write.jdbc
),您实际上应该使用 foreachBatch
.
while
foreach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
在 Jacek 的支持下,我可以修复我的示例:
def process_row(df, epoch_id):
df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreachBatch(process_row).start()
您还必须将 epoch_id 放入函数参数中。否则,您会在 jupyter notebook 中未显示的 spark 日志文件中收到错误。