与 foreachRDD Spark Streaming 的数据库连接

DB connection with foreachRDD Spark Streaming

我在流式传输数据时创建并传递到数据库的连接。每次从文件中读取数据并创建 Neo4j 会话都会增加性能开销。如何更改现有代码以提高应用程序的性能?我是否应该将 foreachRDD 更改为 foreachPartition 以便为连接创建一个单独的对象?

这是我的流媒体代码:

val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))

wordsArrays.foreachRDD(rdd => {

  rdd.flatMap(
  data => {
    val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
    val calendarTime = Calendar.getInstance.getTime
    val recommendationsMap = convertDataToMap(recommendations, calendarTime)
    recommendationsMap

  }).saveToEs("rdd-timed/output")
 }
)

foreachPartiotion 使您能够为每个分区而不是每个地图迭代创建一个对象, 当您需要为每个分区创建一个连接时,它很有用。

但在您的情况下,您创建的所有对象似乎都取决于地图的输入值或当前时间。所以我看不出它对你有什么帮助。

除非你在 execNeo4jSearchQuery 的每个 运行 中创建一个连接,否则我看不出它对你有什么帮助,但如果你确实在每次调用不依赖于数据的函数时创建一个连接,那么它将有所帮助改进代码。 (但很可能瓶颈不存在,所以你不会看到很大的改进)。

最好使用带有 mapPartitions 的数据库连接,然后将带有更新分区的 rdd 保存到 ElasticSearch:

 wordsArrays.foreachRDD(rdd => {

      rdd.mapPartitions { partition => {
            val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))

            val result = partition.map( data => {

              val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
              val calendarTime = Calendar.getInstance.getTime
              convertDataToMap(recommendations, calendarTime)

          }).toList.flatten
          result.iterator
        }
      }.saveToEs("rdd-timed/output")
    })