Insert/Upsert/Delete (CDC) PySpark 结构化流

Insert/Upsert/Delete (CDC) PySpark Structured Streaming

  1. 假设我们有一个这样的初始文件:
Id Number ChangeMode
1 10 insert
2 20 insert
3 30 insert
4 40 insert
5 50 insert
  1. 我在mariaDB中的table应该是这样的:
Id Number
1 10
2 20
3 30
4 40
5 50
  1. 然后像这样的其他文件进入文件夹:
Id Number ChangeMode
1 123 upsert
2 456 upsert
3 30 remove
  1. 而 table 应该是这样的:
Id Number
1 123
2 456
4 40
5 50

我如何使用“ChangeMode”列作为参考来说明何时触发 insert/update/delete?

我已经写了这部分代码,但是我不知道如何从这里开始,也不知道如何实现删除。

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = (SparkSession
    .builder
    .appName("Spark Structured Streaming CDC")
    .config("spark.driver.extraClassPath", "E:\pyspark_projects\mariadb-java-client-2.7.1.jar")
    .getOrCreate())

streamingSchema = StructType([
    StructField("Id", IntegerType(),True),
    StructField("Number", IntegerType(),True),
    StructField("ChangeMode", StringType(),True),
])

streamingDF = (spark.readStream
        .format("csv")
        .option("sep", "|")
        .schema(streamingSchema)
        .csv("E:\pyspark_projects\stream_cdc\files\input\"))

db_target_properties = {"user":"root", "password":"root", "driver":"org.mariadb.jdbc.Driver"}
db_target_url = "jdbc:mariadb://127.0.0.1:3306/projects"

streamingInsert = streamingDF.where("ChangeMode == 'insert'")
streamingUpsert = streamingDF.where("ChangeMode == 'upsert'")

def insert(df, epoch_id):
    streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
    pass

def upsert(df, epoch_id):
    streamingUpsert.write.jdbc(url=db_target_url, table="cdc", mode="update", properties=db_target_properties)
    pass

queryInsert = streamingInsert.writeStream.foreachBatch(insert).start()
queryUpdate = streamingUpsert.writeStream.foreachBatch(upsert).start()

spark.streams.awaitAnyTermination()

我遇到以下错误:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "C:\Spark\python\pyspark\sql\utils.py", line 207, in call
raise e
File "C:\Spark\python\pyspark\sql\utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "main.py", line 32, in insert
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
File "C:\Spark\python\pyspark\sql\dataframe.py", line 231, in write
return DataFrameWriter(self)
File "C:\Spark\python\pyspark\sql\readwriter.py", line 645, in __init__
self._jwrite = df._jdf.write()
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Spark\python\pyspark\sql\utils.py", line 134, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;

at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:572)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)

如果有人知道另一种方法,请告诉我。

我找到了一种方法,使用另一个模块在 mariaDB 中写入,insert/update我只使用一个命令,删除我使用单独的命令:

希望对以后的人有所帮助!

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import mariadb

spark = (SparkSession
    .builder
    .appName("Spark Structured Streaming CDC")
    .getOrCreate())

streamingSchema = StructType([
    StructField("Id", IntegerType(),True),
    StructField("Number", IntegerType(),True),
    StructField("ChangeMode", StringType(),True)
])

streamingDF = (spark.readStream
    .format("csv")
    .option("sep", "|")
    .schema(streamingSchema)
    .csv("E:\pyspark_projects\stream_cdc\files\input\"))

class RowWriter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True
    def process(self, row):
        conn = mariadb.connect(
            user="root",
            password="root",
            host="127.0.0.1",
            port=3306,
            database="projects"
        )
        cur = conn.cursor()
        if(row[2] == 'insert' or 'update'):
            cur.execute("INSERT INTO cdc (Id,Number) VALUES ("+str(row[0])+", "+str(row[1])+") ON DUPLICATE KEY UPDATE Number = "+str(row[1])+"")
        if(row[2] == 'delete'):
            cur.execute("DELETE FROM cdc WHERE Id = "+str(row[0])+"")
        conn.commit()
        conn.close()
        
    def close(self, error):
        print("Closed with error: %s" % str(error))

query = (streamingDF.writeStream
    .foreach(RowWriter())
    .option("checkpointLocation", "E:\pyspark_projects\stream_cdc\files\checkpoint")
    .start())

query.awaitTermination()