如何在 Spark Streaming 中启用背压(使用 pyspark)

How to enable backpressure in Spark Streaming (using pyspark)

我想知道在 spark streamingpyspark 中启用 backpressure 的正确方法是什么。看来我在短时间内从 Kafka 发来的消息太多了,爆炸了。下面是我的 spark streaming 代码。谁能指出我启用 back pressure 的正确位置?

sc = SparkContext(appName="PythonStreamingDirectKafka")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark_check/")
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic],
                                    {"metadata.broker.list": bootstrap_servers_ipaddress})
parsed_msg = kvs.map(lambda (key, value): json.loads(value))
## do something below

以下是我在 kafka 流代码中设置背压的方法。 希望对你有帮助。

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("PythonStreamingDirectKafka")\
        .set("spark.streaming.backpressure.enabled", "true") \
        .set("spark.streaming.backpressure.initialRate", "500")

sc = SparkContext(conf=conf)