使用 foreachPartition 在 Spark 上发出 HTTP post 请求

Making HTTP post requests on Spark usign foreachPartition

需要一些帮助来理解以下在 Spark 中的行为(使用 Scala 和 Databricks)

我有一些数据帧(如果重要的话,从 S3 读取),并且会通过 HTTP post 请求以 1000 次(最多)为一批发送该数据。所以我对数据框进行了重新分区,以确保每个分区的记录不超过 1000 条。另外,为每一行创建了一个 json 列(所以我只需要稍后将它们放在一个数组中)

问题出在提出要求上。我使用以下代码

创建了以下可序列化 class
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils

object postObject extends Serializable{
  val client = HttpClientBuilder.create().build()
  val post = new HttpPost("https://my-cool-api-endpoint")
  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  def makeHttpCall(row: Iterator[Row]) = {
      val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"      
      post.setEntity(new StringEntity(json_str))
      val response = client.execute(post)
      val entity = response.getEntity()
      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
      println(IOUtils.toString(entity.getContent()))
  }
}

现在,当我尝试以下操作时:

postObject.makeHttpCall(data.head(2).toIterator)

它就像一个魅力。请求通过,屏幕上有一些输出,我的 API 获取了该数据。

但是当我尝试将其放入 foreachPartition 时:

data.foreachPartition { x => 
  postObject.makeHttpCall(x)
}

没有任何反应。屏幕上没有输出,我的 API 中没有任何内容。如果我尝试重新运行它,几乎所有阶段都会跳过。我相信,出于任何原因,它只是懒惰地评估我的请求,而不是实际执行它。我不明白为什么,以及如何强制它。

postObject 有 2 个字段:clientpost 必须序列化。

我不确定 client 是否正确序列化。 post 对象可能从多个分区(在同一个 worker 上)发生变异。这里有很多事情可能出错。

我建议尝试删除 postObject 并将其主体直接内联到 foreachPartition

加法:

自己尝试运行:

sc.parallelize((1 to 10).toList).foreachPartition(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
      })

运行 它在本地和集群中。 它成功完成并将 405 错误打印到工作日志。 所以请求肯定会命中服务器。

foreachPartition returns 结果什么也没有。要调试您的问题,您可以将其更改为 mapPartitions:

val responseCodes = sc.parallelize((1 to 10).toList).mapPartitions(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
        Iterator.single(response.getStatusLine.getStatusCode)
      }).collect()

println(responseCodes.mkString(", "))

此代码 returns 响应代码列表,以便您对其进行分析。 对我来说,它按预期打印 405, 405

有一种方法可以做到这一点,而不必找出究竟是什么不可序列化。如果你想保持你代码的结构,你可以让所有字段@transient lazy val。此外,任何有副作用的调用都应该包含在一个块中。例如

val post = {
  val httpPost = new HttpPost("https://my-cool-api-endpoint")
  httpPost.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  httpPost
}

这将延迟所有字段的初始化,直到它们被工作人员使用。每个工作人员都有一个对象实例,您将能够调用 makeHttpCall 方法。