是spark kafka-stream-reader缓存数据
is spark kafka-stream-reader caching data
我发现这是一个很好的问题,我可能会在 spark-kafka-streaming 源代码中找到答案,如果没有人能回答这个问题,我会这样做。
想象这样的场景:
val dstream = ...
dstream.foreachRDD(
rdd=>
rdd.count()
rdd.collect()
)
在上面的示例代码中,正如我们所看到的,我们从 dstream 获取微批次,并且对于每个批次,我们触发 2 个操作。
- count() 有多少行
- 收集()所有行
根据 Spark 的惰性评估行为,这两个操作都会追溯到数据源的来源(即 kafka 主题),而且由于我们没有任何 persist() 或广泛的转换,所以没有在我们的代码逻辑中,这将使 spark 缓存它从 kafka 读取的数据。
问题来了。 spark 会从 kafka 读取两次还是只读取一次?这与 perf 非常相关,因为从 kafka 读取涉及 netIO 并可能给 kafka 经纪人带来更多压力。所以如果 spark-kafka-streaming lib 不会缓存它,我们绝对应该在 multi-actions 之前缓存 ()/persist() 它。
欢迎大家讨论。谢谢。
编辑:
刚刚在 spark 官方网站上找到了一些文档,看起来执行者接收器正在缓存数据。但我不知道这是否仅适用于单独的接收器。因为我读到 spark kafka streaming lib 不使用单独的接收器,它接收数据并在同一个核心上处理数据。
http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization
Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.
使用 DStreams
时没有隐式缓存,因此除非您显式缓存,否则每次评估都会命中 Kafka 代理。
如果多次求值,broker 与 Spark 节点不在同一位置,则一定要考虑缓存。
根据 Spark 的官方文档:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization
Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.
我发现这是一个很好的问题,我可能会在 spark-kafka-streaming 源代码中找到答案,如果没有人能回答这个问题,我会这样做。
想象这样的场景:
val dstream = ...
dstream.foreachRDD(
rdd=>
rdd.count()
rdd.collect()
)
在上面的示例代码中,正如我们所看到的,我们从 dstream 获取微批次,并且对于每个批次,我们触发 2 个操作。
- count() 有多少行
- 收集()所有行
根据 Spark 的惰性评估行为,这两个操作都会追溯到数据源的来源(即 kafka 主题),而且由于我们没有任何 persist() 或广泛的转换,所以没有在我们的代码逻辑中,这将使 spark 缓存它从 kafka 读取的数据。
问题来了。 spark 会从 kafka 读取两次还是只读取一次?这与 perf 非常相关,因为从 kafka 读取涉及 netIO 并可能给 kafka 经纪人带来更多压力。所以如果 spark-kafka-streaming lib 不会缓存它,我们绝对应该在 multi-actions 之前缓存 ()/persist() 它。
欢迎大家讨论。谢谢。
编辑:
刚刚在 spark 官方网站上找到了一些文档,看起来执行者接收器正在缓存数据。但我不知道这是否仅适用于单独的接收器。因为我读到 spark kafka streaming lib 不使用单独的接收器,它接收数据并在同一个核心上处理数据。
http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization
Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.
使用 DStreams
时没有隐式缓存,因此除非您显式缓存,否则每次评估都会命中 Kafka 代理。
如果多次求值,broker 与 Spark 节点不在同一位置,则一定要考虑缓存。
根据 Spark 的官方文档: http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization
Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.