使用 ForeachWriter 的 Azure Databricks 结构化流

Azure Databricks Structured Streaming with ForeachWriter

我正在尝试通过直接从结构化流传递值来调用 REST API。我正在尝试以下列方式实现它。我不希望此 REST API 调用有任何响应,但我需要确保对端点的每次调用都成功。我怎样才能确保这一点?目前,所有拨打的电话都不成功,我无法追踪。有什么建议么。

import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: String) = {
     import sys.process._
     val command = """curl -d '{"Id":"""+ value + """}'  -H "Content-Type: application/json"  -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
     Seq("/bin/bash","-c",command).!!
  }
  override def close(errorOrNull: Throwable) = {}
}

val results = output.map(r => r.getString(0))

results.writeStream
  .queryName("rest-api-processor")
  .foreach(writer)
  .start
  .awaitTermination

在没有 .!! 的 Spark 中执行了来自 Bash 的命令,如前所述。或者,我们也可以打印语句。

    import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: Row) = {
     import sys.process._
     val command = """curl -d '{"Id":"""+ value + """}'  -H "Content-Type: application/json"  -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
     Seq("/bin/bash","-c",command)
  }
  override def close(errorOrNull: Throwable) = {}
}