如何在我的 spark 2.4.7 中连接和写入 postgres jdbc?
how to connect and writestream the postgres jdbc in my spark 2.4.7?
df5.writeStream.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").start()
总是得到
Py4JJavaError: An error occurred while calling o112.start.
: java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:311)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
报错信息很清楚的说了报错原因:Data source jdbc does not support streamed writing
所以如果你想从结构化流向 JDBC sink 写入数据,你需要使用 foreachBatch 代替,像这样:
def foreach_batch_function(df, epoch_id):
df.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").save()
df5.writeStream.foreachBatch(foreach_batch_function).start()
df5.writeStream.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").start()
总是得到
Py4JJavaError: An error occurred while calling o112.start.
: java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:311)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
报错信息很清楚的说了报错原因:Data source jdbc does not support streamed writing
所以如果你想从结构化流向 JDBC sink 写入数据,你需要使用 foreachBatch 代替,像这样:
def foreach_batch_function(df, epoch_id):
df.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").save()
df5.writeStream.foreachBatch(foreach_batch_function).start()