如何在 Spark Streaming for Lookups 中创建到数据源的连接

How to create connection(s) to a Datasource in Spark Streaming for Lookups

我有一个用例,我们正在流式传输事件,对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。火花流将 运行 40 个执行程序,我有 5 个这样的流作业都连接到同一个 Redis 集群。所以我很困惑我应该采用什么方法来创建 Redis 连接

  1. 在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?

  2. 为每个分区创建一个Redis连接,但是我的代码是这样写的

    val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })

所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区我都将创建一个新连接。

有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

如果需要,我可以添加更多 context/info。

1.在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?

答案 - 不能。由于与连接关联的机器相关数据,大多数连接对象不可序列化。

2。有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?

Ans- 是的,创建一个连接池并在分区中使用它。这是风格。你可以像这样创建一个连接池 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

然后使用它

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

请检查: design pattern for using foreachRDD