Spark 流:使用参考数据丰富流
Spark streams: enrich stream with reference data
我设置了 spark streaming,以便它从套接字读取数据,在将数据发布到 rabbit 队列之前对数据进行一些丰富。
在设置流上下文之前,该扩充项从通过读取常规文本文件 (Source.fromFile...) 实例化的地图中查找信息。
我觉得这不是真正应该做的事情。另一方面,当使用 StreamingContext 时,我只能从流中读取,而不能像使用 SparkContext 那样从静态文件中读取。
我可以尝试允许多个上下文,但我也不确定这是否是正确的方法。
如有任何建议,我们将不胜感激。
如果您的文件很小并且不在分布式文件系统上,Source.fromFile
就可以了(无论完成什么工作)。
如果你想通过SparkContext读取文件,你仍然可以通过streamingContext.sparkContext
访问它,并将它与transform
或foreachRDD
.
中的DStream结合起来
假设用于丰富的地图相当小,无法保存在内存中,在 Spark 作业中使用该数据的推荐方法是通过广播变量。此类变量的内容将被发送到每个执行程序一次,从而避免以这种方式序列化闭包中捕获的数据集的开销。
广播变量是在驱动程序中实例化的包装器,数据是 'unwrapped' 在闭包中使用 broadcastVar.value
方法。
这将是如何使用 广播变量 与 DStream 的示例:
// could replace with Source.from File as well. This is just more practical
val data = sc.textFile("loopup.txt").map(toKeyValue).collectAsMap()
// declare the broadcast variable
val bcastData = sc.broadcast(data)
... initialize streams ...
socketDStream.map{ elem =>
// doing every step here explicitly for illustrative purposes. Usually, one would typically just chain these calls
// get the map within the broadcast wrapper
val lookupMap = bcastData.value
// use the map to lookup some data
val lookupValue = lookupMap.getOrElse(elem, "not found")
// create the desired result
(elem, lookupValue)
}
socketDStream.saveTo...
我设置了 spark streaming,以便它从套接字读取数据,在将数据发布到 rabbit 队列之前对数据进行一些丰富。 在设置流上下文之前,该扩充项从通过读取常规文本文件 (Source.fromFile...) 实例化的地图中查找信息。
我觉得这不是真正应该做的事情。另一方面,当使用 StreamingContext 时,我只能从流中读取,而不能像使用 SparkContext 那样从静态文件中读取。
我可以尝试允许多个上下文,但我也不确定这是否是正确的方法。
如有任何建议,我们将不胜感激。
如果您的文件很小并且不在分布式文件系统上,Source.fromFile
就可以了(无论完成什么工作)。
如果你想通过SparkContext读取文件,你仍然可以通过streamingContext.sparkContext
访问它,并将它与transform
或foreachRDD
.
假设用于丰富的地图相当小,无法保存在内存中,在 Spark 作业中使用该数据的推荐方法是通过广播变量。此类变量的内容将被发送到每个执行程序一次,从而避免以这种方式序列化闭包中捕获的数据集的开销。
广播变量是在驱动程序中实例化的包装器,数据是 'unwrapped' 在闭包中使用 broadcastVar.value
方法。
这将是如何使用 广播变量 与 DStream 的示例:
// could replace with Source.from File as well. This is just more practical
val data = sc.textFile("loopup.txt").map(toKeyValue).collectAsMap()
// declare the broadcast variable
val bcastData = sc.broadcast(data)
... initialize streams ...
socketDStream.map{ elem =>
// doing every step here explicitly for illustrative purposes. Usually, one would typically just chain these calls
// get the map within the broadcast wrapper
val lookupMap = bcastData.value
// use the map to lookup some data
val lookupValue = lookupMap.getOrElse(elem, "not found")
// create the desired result
(elem, lookupValue)
}
socketDStream.saveTo...