如何在 Spark Streaming for Lookups 中创建到数据源的连接
How to create connection(s) to a Datasource in Spark Streaming for Lookups
我有一个用例,我们正在流式传输事件,对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。火花流将 运行 40 个执行程序,我有 5 个这样的流作业都连接到同一个 Redis 集群。所以我很困惑我应该采用什么方法来创建 Redis 连接
在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?
为每个分区创建一个Redis连接,但是我的代码是这样写的
val update = xyz.transform(rdd => {
// on driver
if (xyz.isNewDay) {
.....
}
rdd
})
update.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
partition.foreach(Key_trans => {
// perform some lookups logic here
}
}
})
所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区我都将创建一个新连接。
有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?
如果需要,我可以添加更多 context/info。
1.在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?
答案 - 不能。由于与连接关联的机器相关数据,大多数连接对象不可序列化。
2。有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?
Ans- 是的,创建一个连接池并在分区中使用它。这是风格。你可以像这样创建一个连接池 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
然后使用它
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
我有一个用例,我们正在流式传输事件,对于每个事件我都必须进行一些查找。查找在 Redis 中,我想知道创建连接的最佳方法是什么。火花流将 运行 40 个执行程序,我有 5 个这样的流作业都连接到同一个 Redis 集群。所以我很困惑我应该采用什么方法来创建 Redis 连接
在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?
为每个分区创建一个Redis连接,但是我的代码是这样写的
val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })
所以现在如果我在每个分区内创建一个连接,这意味着对于每个 RDD 和该 RDD 中的每个分区我都将创建一个新连接。
有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?
如果需要,我可以添加更多 context/info。
1.在驱动程序上创建一个连接对象并将其广播给执行程序(不确定它是否真的有效,因为我必须使该对象可序列化)。我可以使用广播变量吗?
答案 - 不能。由于与连接关联的机器相关数据,大多数连接对象不可序列化。
2。有没有一种方法可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接?
Ans- 是的,创建一个连接池并在分区中使用它。这是风格。你可以像这样创建一个连接池 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
然后使用它
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}