与 foreachRDD Spark Streaming 的数据库连接
DB connection with foreachRDD Spark Streaming
我在流式传输数据时创建并传递到数据库的连接。每次从文件中读取数据并创建 Neo4j 会话都会增加性能开销。如何更改现有代码以提高应用程序的性能?我是否应该将 foreachRDD 更改为 foreachPartition 以便为连接创建一个单独的对象?
这是我的流媒体代码:
val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))
wordsArrays.foreachRDD(rdd => {
rdd.flatMap(
data => {
val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
val calendarTime = Calendar.getInstance.getTime
val recommendationsMap = convertDataToMap(recommendations, calendarTime)
recommendationsMap
}).saveToEs("rdd-timed/output")
}
)
foreachPartiotion 使您能够为每个分区而不是每个地图迭代创建一个对象,
当您需要为每个分区创建一个连接时,它很有用。
但在您的情况下,您创建的所有对象似乎都取决于地图的输入值或当前时间。所以我看不出它对你有什么帮助。
除非你在 execNeo4jSearchQuery 的每个 运行 中创建一个连接,否则我看不出它对你有什么帮助,但如果你确实在每次调用不依赖于数据的函数时创建一个连接,那么它将有所帮助改进代码。 (但很可能瓶颈不存在,所以你不会看到很大的改进)。
最好使用带有 mapPartitions 的数据库连接,然后将带有更新分区的 rdd 保存到 ElasticSearch:
wordsArrays.foreachRDD(rdd => {
rdd.mapPartitions { partition => {
val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))
val result = partition.map( data => {
val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
val calendarTime = Calendar.getInstance.getTime
convertDataToMap(recommendations, calendarTime)
}).toList.flatten
result.iterator
}
}.saveToEs("rdd-timed/output")
})
我在流式传输数据时创建并传递到数据库的连接。每次从文件中读取数据并创建 Neo4j 会话都会增加性能开销。如何更改现有代码以提高应用程序的性能?我是否应该将 foreachRDD 更改为 foreachPartition 以便为连接创建一个单独的对象?
这是我的流媒体代码:
val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))
wordsArrays.foreachRDD(rdd => {
rdd.flatMap(
data => {
val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
val calendarTime = Calendar.getInstance.getTime
val recommendationsMap = convertDataToMap(recommendations, calendarTime)
recommendationsMap
}).saveToEs("rdd-timed/output")
}
)
foreachPartiotion 使您能够为每个分区而不是每个地图迭代创建一个对象, 当您需要为每个分区创建一个连接时,它很有用。
但在您的情况下,您创建的所有对象似乎都取决于地图的输入值或当前时间。所以我看不出它对你有什么帮助。
除非你在 execNeo4jSearchQuery 的每个 运行 中创建一个连接,否则我看不出它对你有什么帮助,但如果你确实在每次调用不依赖于数据的函数时创建一个连接,那么它将有所帮助改进代码。 (但很可能瓶颈不存在,所以你不会看到很大的改进)。
最好使用带有 mapPartitions 的数据库连接,然后将带有更新分区的 rdd 保存到 ElasticSearch:
wordsArrays.foreachRDD(rdd => {
rdd.mapPartitions { partition => {
val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))
val result = partition.map( data => {
val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
val calendarTime = Calendar.getInstance.getTime
convertDataToMap(recommendations, calendarTime)
}).toList.flatten
result.iterator
}
}.saveToEs("rdd-timed/output")
})