如何在我的 spark 2.4.7 中连接和写入 postgres jdbc?

how to connect and writestream the postgres jdbc in my spark 2.4.7?

df5.writeStream.format("jdbc").option("url", "url")\
  .option("dbtable","test").option("user","postgres")\
  .option("password", "password").start()

总是得到

Py4JJavaError: An error occurred while calling o112.start.
: java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:311)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

报错信息很清楚的说了报错原因:Data source jdbc does not support streamed writing

所以如果你想从结构化流向 JDBC sink 写入数据,你需要使用 foreachBatch 代替,像这样:

def foreach_batch_function(df, epoch_id):
    df.format("jdbc").option("url", "url")\
      .option("dbtable","test").option("user","postgres")\
      .option("password", "password").save()
  
df5.writeStream.foreachBatch(foreach_batch_function).start()