如何在 Spark Streaming 中启用背压(使用 pyspark)
How to enable backpressure in Spark Streaming (using pyspark)
我想知道在 spark streaming
到 pyspark
中启用 backpressure
的正确方法是什么。看来我在短时间内从 Kafka
发来的消息太多了,爆炸了。下面是我的 spark streaming
代码。谁能指出我启用 back pressure
的正确位置?
sc = SparkContext(appName="PythonStreamingDirectKafka")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark_check/")
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic],
{"metadata.broker.list": bootstrap_servers_ipaddress})
parsed_msg = kvs.map(lambda (key, value): json.loads(value))
## do something below
以下是我在 kafka 流代码中设置背压的方法。
希望对你有帮助。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("PythonStreamingDirectKafka")\
.set("spark.streaming.backpressure.enabled", "true") \
.set("spark.streaming.backpressure.initialRate", "500")
sc = SparkContext(conf=conf)
我想知道在 spark streaming
到 pyspark
中启用 backpressure
的正确方法是什么。看来我在短时间内从 Kafka
发来的消息太多了,爆炸了。下面是我的 spark streaming
代码。谁能指出我启用 back pressure
的正确位置?
sc = SparkContext(appName="PythonStreamingDirectKafka")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark_check/")
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic],
{"metadata.broker.list": bootstrap_servers_ipaddress})
parsed_msg = kvs.map(lambda (key, value): json.loads(value))
## do something below
以下是我在 kafka 流代码中设置背压的方法。 希望对你有帮助。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("PythonStreamingDirectKafka")\
.set("spark.streaming.backpressure.enabled", "true") \
.set("spark.streaming.backpressure.initialRate", "500")
sc = SparkContext(conf=conf)