使用 ForeachWriter 的 Azure Databricks 结构化流
Azure Databricks Structured Streaming with ForeachWriter
我正在尝试通过直接从结构化流传递值来调用 REST API。我正在尝试以下列方式实现它。我不希望此 REST API 调用有任何响应,但我需要确保对端点的每次调用都成功。我怎样才能确保这一点?目前,所有拨打的电话都不成功,我无法追踪。有什么建议么。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
override def open(partitionId: Long, version: Long) = true
override def process(value: String) = {
import sys.process._
val command = """curl -d '{"Id":"""+ value + """}' -H "Content-Type: application/json" -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
Seq("/bin/bash","-c",command).!!
}
override def close(errorOrNull: Throwable) = {}
}
val results = output.map(r => r.getString(0))
results.writeStream
.queryName("rest-api-processor")
.foreach(writer)
.start
.awaitTermination
在没有 .!!
的 Spark 中执行了来自 Bash 的命令,如前所述。或者,我们也可以打印语句。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Row) = {
import sys.process._
val command = """curl -d '{"Id":"""+ value + """}' -H "Content-Type: application/json" -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
Seq("/bin/bash","-c",command)
}
override def close(errorOrNull: Throwable) = {}
}
我正在尝试通过直接从结构化流传递值来调用 REST API。我正在尝试以下列方式实现它。我不希望此 REST API 调用有任何响应,但我需要确保对端点的每次调用都成功。我怎样才能确保这一点?目前,所有拨打的电话都不成功,我无法追踪。有什么建议么。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
override def open(partitionId: Long, version: Long) = true
override def process(value: String) = {
import sys.process._
val command = """curl -d '{"Id":"""+ value + """}' -H "Content-Type: application/json" -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
Seq("/bin/bash","-c",command).!!
}
override def close(errorOrNull: Throwable) = {}
}
val results = output.map(r => r.getString(0))
results.writeStream
.queryName("rest-api-processor")
.foreach(writer)
.start
.awaitTermination
在没有 .!!
的 Spark 中执行了来自 Bash 的命令,如前所述。或者,我们也可以打印语句。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Row) = {
import sys.process._
val command = """curl -d '{"Id":"""+ value + """}' -H "Content-Type: application/json" -X POST http://xx.xxx.xxx.xxx:xxxx/xxx/xxxxx/xxxx"""
Seq("/bin/bash","-c",command)
}
override def close(errorOrNull: Throwable) = {}
}