Insert/Upsert/Delete (CDC) PySpark 结构化流
Insert/Upsert/Delete (CDC) PySpark Structured Streaming
- 假设我们有一个这样的初始文件:
Id
Number
ChangeMode
1
10
insert
2
20
insert
3
30
insert
4
40
insert
5
50
insert
- 我在mariaDB中的table应该是这样的:
Id
Number
1
10
2
20
3
30
4
40
5
50
- 然后像这样的其他文件进入文件夹:
Id
Number
ChangeMode
1
123
upsert
2
456
upsert
3
30
remove
- 而 table 应该是这样的:
Id
Number
1
123
2
456
4
40
5
50
我如何使用“ChangeMode”列作为参考来说明何时触发 insert/update/delete?
我已经写了这部分代码,但是我不知道如何从这里开始,也不知道如何实现删除。
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.config("spark.driver.extraClassPath", "E:\pyspark_projects\mariadb-java-client-2.7.1.jar")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True),
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\pyspark_projects\stream_cdc\files\input\"))
db_target_properties = {"user":"root", "password":"root", "driver":"org.mariadb.jdbc.Driver"}
db_target_url = "jdbc:mariadb://127.0.0.1:3306/projects"
streamingInsert = streamingDF.where("ChangeMode == 'insert'")
streamingUpsert = streamingDF.where("ChangeMode == 'upsert'")
def insert(df, epoch_id):
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
pass
def upsert(df, epoch_id):
streamingUpsert.write.jdbc(url=db_target_url, table="cdc", mode="update", properties=db_target_properties)
pass
queryInsert = streamingInsert.writeStream.foreachBatch(insert).start()
queryUpdate = streamingUpsert.writeStream.foreachBatch(upsert).start()
spark.streams.awaitAnyTermination()
我遇到以下错误:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "C:\Spark\python\pyspark\sql\utils.py", line 207, in call
raise e
File "C:\Spark\python\pyspark\sql\utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "main.py", line 32, in insert
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
File "C:\Spark\python\pyspark\sql\dataframe.py", line 231, in write
return DataFrameWriter(self)
File "C:\Spark\python\pyspark\sql\readwriter.py", line 645, in __init__
self._jwrite = df._jdf.write()
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Spark\python\pyspark\sql\utils.py", line 134, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:572)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
如果有人知道另一种方法,请告诉我。
我找到了一种方法,使用另一个模块在 mariaDB 中写入,insert/update我只使用一个命令,删除我使用单独的命令:
希望对以后的人有所帮助!
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import mariadb
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True)
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\pyspark_projects\stream_cdc\files\input\"))
class RowWriter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
conn = mariadb.connect(
user="root",
password="root",
host="127.0.0.1",
port=3306,
database="projects"
)
cur = conn.cursor()
if(row[2] == 'insert' or 'update'):
cur.execute("INSERT INTO cdc (Id,Number) VALUES ("+str(row[0])+", "+str(row[1])+") ON DUPLICATE KEY UPDATE Number = "+str(row[1])+"")
if(row[2] == 'delete'):
cur.execute("DELETE FROM cdc WHERE Id = "+str(row[0])+"")
conn.commit()
conn.close()
def close(self, error):
print("Closed with error: %s" % str(error))
query = (streamingDF.writeStream
.foreach(RowWriter())
.option("checkpointLocation", "E:\pyspark_projects\stream_cdc\files\checkpoint")
.start())
query.awaitTermination()
- 假设我们有一个这样的初始文件:
Id | Number | ChangeMode |
---|---|---|
1 | 10 | insert |
2 | 20 | insert |
3 | 30 | insert |
4 | 40 | insert |
5 | 50 | insert |
- 我在mariaDB中的table应该是这样的:
Id | Number |
---|---|
1 | 10 |
2 | 20 |
3 | 30 |
4 | 40 |
5 | 50 |
- 然后像这样的其他文件进入文件夹:
Id | Number | ChangeMode |
---|---|---|
1 | 123 | upsert |
2 | 456 | upsert |
3 | 30 | remove |
- 而 table 应该是这样的:
Id | Number |
---|---|
1 | 123 |
2 | 456 |
4 | 40 |
5 | 50 |
我如何使用“ChangeMode”列作为参考来说明何时触发 insert/update/delete?
我已经写了这部分代码,但是我不知道如何从这里开始,也不知道如何实现删除。
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.config("spark.driver.extraClassPath", "E:\pyspark_projects\mariadb-java-client-2.7.1.jar")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True),
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\pyspark_projects\stream_cdc\files\input\"))
db_target_properties = {"user":"root", "password":"root", "driver":"org.mariadb.jdbc.Driver"}
db_target_url = "jdbc:mariadb://127.0.0.1:3306/projects"
streamingInsert = streamingDF.where("ChangeMode == 'insert'")
streamingUpsert = streamingDF.where("ChangeMode == 'upsert'")
def insert(df, epoch_id):
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
pass
def upsert(df, epoch_id):
streamingUpsert.write.jdbc(url=db_target_url, table="cdc", mode="update", properties=db_target_properties)
pass
queryInsert = streamingInsert.writeStream.foreachBatch(insert).start()
queryUpdate = streamingUpsert.writeStream.foreachBatch(upsert).start()
spark.streams.awaitAnyTermination()
我遇到以下错误:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "C:\Spark\python\pyspark\sql\utils.py", line 207, in call
raise e
File "C:\Spark\python\pyspark\sql\utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "main.py", line 32, in insert
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
File "C:\Spark\python\pyspark\sql\dataframe.py", line 231, in write
return DataFrameWriter(self)
File "C:\Spark\python\pyspark\sql\readwriter.py", line 645, in __init__
self._jwrite = df._jdf.write()
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Spark\python\pyspark\sql\utils.py", line 134, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:572)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
如果有人知道另一种方法,请告诉我。
我找到了一种方法,使用另一个模块在 mariaDB 中写入,insert/update我只使用一个命令,删除我使用单独的命令:
希望对以后的人有所帮助!
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import mariadb
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True)
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\pyspark_projects\stream_cdc\files\input\"))
class RowWriter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
conn = mariadb.connect(
user="root",
password="root",
host="127.0.0.1",
port=3306,
database="projects"
)
cur = conn.cursor()
if(row[2] == 'insert' or 'update'):
cur.execute("INSERT INTO cdc (Id,Number) VALUES ("+str(row[0])+", "+str(row[1])+") ON DUPLICATE KEY UPDATE Number = "+str(row[1])+"")
if(row[2] == 'delete'):
cur.execute("DELETE FROM cdc WHERE Id = "+str(row[0])+"")
conn.commit()
conn.close()
def close(self, error):
print("Closed with error: %s" % str(error))
query = (streamingDF.writeStream
.foreach(RowWriter())
.option("checkpointLocation", "E:\pyspark_projects\stream_cdc\files\checkpoint")
.start())
query.awaitTermination()