Spark Streaming:为什么处理几 MB 的用户状态的内部处理成本如此之高?

Spark Streaming: Why internal processing costs are so high to handle user state of a few MB?

根据我们的实验,我们发现当状态超过一百万个对象时,有状态的 Spark Streaming 内部处理成本会花费大量时间。结果延迟受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间 > 批处理间隔)。

它与我们的应用程序的细节无关,因为它可以通过下面的代码重现。

Spark 内部 processing/infrastructure 花费这么多时间来处理用户状态的成本到底是什么?除了简单地增加批次间隔之外,还有其他方法可以减少处理时间吗?

我们计划广泛使用状态:在几个节点中的每一个节点上至少有 100MB 左右,以将所有数据保存在内存中,并且每小时只转储一次。

增加批次间隔会有所帮助,但我们希望保持批次间隔最小。

原因可能不是space被状态占用,而是大对象图,因为当我们将列表更改为大基元数组时,问题就消失了。

只是一个猜测:它可能与 Spark 内部使用的 org.apache.spark.util.SizeEstimator 有关,因为它会在分析时不时出现。

这是在现代 iCore7 上重现上图的简单演示:

代码:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                // Tried KryoSerializer, but it does not seem to help much
                //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .setMaster("local[*]")
                .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
        javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)

        List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
        System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
        JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);

        JavaPairDStream<String, State> stream = javaStreamingContext
                .textFileStream(".") // fake: effectively, no input at all
                .mapToPair(input -> (Tuple2<String, State>) null) //  fake to get JavaPairDStream
                .updateStateByKey(
                        (inputs, maybeState) -> maybeState, // simplest possible dummy function
                        new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
                        initialRdd); // set generated state

        stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
            System.out.println("Is empty: " + rdd.isEmpty());
            return null;
        });

        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }

    private static List<Tuple2<String, State>> prepareInitialRddData() {
        // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
        int stateCount = 1000;
        int dataListSize = 200;
        int elementDataSize = 10;
        List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
        for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
            List<String> stateData = new ArrayList<>(dataListSize);
            for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
                stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
            }
            initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
        }
        return initialRddInput;
    }

}

spark 1.6 中改进了状态管理。
请参考 SPARK-2629 改进的 Spark Streaming 状态管理;

并且在详细设计规范中:
Improved state management in Spark Streaming

一个性能缺点如下:

需要不扫描每个键的更优化的状态管理 当前 updateStateByKey 扫描每个批次间隔中的每个键,即使该键没有数据。虽然这种语义对某些工作负载很有用,但大多数工作负载只需要“扫描和更新有新数据的状态”。在每个批次间隔中,只需要触及所有状态的一小部分。 The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed.