使用 foreachPartition 在 Spark 上发出 HTTP post 请求
Making HTTP post requests on Spark usign foreachPartition
需要一些帮助来理解以下在 Spark 中的行为(使用 Scala 和 Databricks)
我有一些数据帧(如果重要的话,从 S3 读取),并且会通过 HTTP post 请求以 1000 次(最多)为一批发送该数据。所以我对数据框进行了重新分区,以确保每个分区的记录不超过 1000 条。另外,为每一行创建了一个 json 列(所以我只需要稍后将它们放在一个数组中)
问题出在提出要求上。我使用以下代码
创建了以下可序列化 class
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils
object postObject extends Serializable{
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://my-cool-api-endpoint")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
def makeHttpCall(row: Iterator[Row]) = {
val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
}
}
现在,当我尝试以下操作时:
postObject.makeHttpCall(data.head(2).toIterator)
它就像一个魅力。请求通过,屏幕上有一些输出,我的 API 获取了该数据。
但是当我尝试将其放入 foreachPartition 时:
data.foreachPartition { x =>
postObject.makeHttpCall(x)
}
没有任何反应。屏幕上没有输出,我的 API 中没有任何内容。如果我尝试重新运行它,几乎所有阶段都会跳过。我相信,出于任何原因,它只是懒惰地评估我的请求,而不是实际执行它。我不明白为什么,以及如何强制它。
postObject
有 2 个字段:client
和 post
必须序列化。
我不确定 client
是否正确序列化。 post
对象可能从多个分区(在同一个 worker 上)发生变异。这里有很多事情可能出错。
我建议尝试删除 postObject
并将其主体直接内联到 foreachPartition
。
加法:
自己尝试运行:
sc.parallelize((1 to 10).toList).foreachPartition(row => {
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://google.com")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
})
运行 它在本地和集群中。
它成功完成并将 405 错误打印到工作日志。
所以请求肯定会命中服务器。
foreachPartition
returns 结果什么也没有。要调试您的问题,您可以将其更改为 mapPartitions
:
val responseCodes = sc.parallelize((1 to 10).toList).mapPartitions(row => {
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://google.com")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
Iterator.single(response.getStatusLine.getStatusCode)
}).collect()
println(responseCodes.mkString(", "))
此代码 returns 响应代码列表,以便您对其进行分析。
对我来说,它按预期打印 405, 405
。
有一种方法可以做到这一点,而不必找出究竟是什么不可序列化。如果你想保持你代码的结构,你可以让所有字段@transient lazy val
。此外,任何有副作用的调用都应该包含在一个块中。例如
val post = {
val httpPost = new HttpPost("https://my-cool-api-endpoint")
httpPost.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
httpPost
}
这将延迟所有字段的初始化,直到它们被工作人员使用。每个工作人员都有一个对象实例,您将能够调用 makeHttpCall
方法。
需要一些帮助来理解以下在 Spark 中的行为(使用 Scala 和 Databricks)
我有一些数据帧(如果重要的话,从 S3 读取),并且会通过 HTTP post 请求以 1000 次(最多)为一批发送该数据。所以我对数据框进行了重新分区,以确保每个分区的记录不超过 1000 条。另外,为每一行创建了一个 json 列(所以我只需要稍后将它们放在一个数组中)
问题出在提出要求上。我使用以下代码
创建了以下可序列化 classimport org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils
object postObject extends Serializable{
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://my-cool-api-endpoint")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
def makeHttpCall(row: Iterator[Row]) = {
val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
}
}
现在,当我尝试以下操作时:
postObject.makeHttpCall(data.head(2).toIterator)
它就像一个魅力。请求通过,屏幕上有一些输出,我的 API 获取了该数据。
但是当我尝试将其放入 foreachPartition 时:
data.foreachPartition { x =>
postObject.makeHttpCall(x)
}
没有任何反应。屏幕上没有输出,我的 API 中没有任何内容。如果我尝试重新运行它,几乎所有阶段都会跳过。我相信,出于任何原因,它只是懒惰地评估我的请求,而不是实际执行它。我不明白为什么,以及如何强制它。
postObject
有 2 个字段:client
和 post
必须序列化。
我不确定 client
是否正确序列化。 post
对象可能从多个分区(在同一个 worker 上)发生变异。这里有很多事情可能出错。
我建议尝试删除 postObject
并将其主体直接内联到 foreachPartition
。
加法:
自己尝试运行:
sc.parallelize((1 to 10).toList).foreachPartition(row => {
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://google.com")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
})
运行 它在本地和集群中。 它成功完成并将 405 错误打印到工作日志。 所以请求肯定会命中服务器。
foreachPartition
returns 结果什么也没有。要调试您的问题,您可以将其更改为 mapPartitions
:
val responseCodes = sc.parallelize((1 to 10).toList).mapPartitions(row => {
val client = HttpClientBuilder.create().build()
val post = new HttpPost("https://google.com")
post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
post.setEntity(new StringEntity(json_str))
val response = client.execute(post)
val entity = response.getEntity()
println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
println(IOUtils.toString(entity.getContent()))
Iterator.single(response.getStatusLine.getStatusCode)
}).collect()
println(responseCodes.mkString(", "))
此代码 returns 响应代码列表,以便您对其进行分析。
对我来说,它按预期打印 405, 405
。
有一种方法可以做到这一点,而不必找出究竟是什么不可序列化。如果你想保持你代码的结构,你可以让所有字段@transient lazy val
。此外,任何有副作用的调用都应该包含在一个块中。例如
val post = {
val httpPost = new HttpPost("https://my-cool-api-endpoint")
httpPost.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
httpPost
}
这将延迟所有字段的初始化,直到它们被工作人员使用。每个工作人员都有一个对象实例,您将能够调用 makeHttpCall
方法。