Spark Streaming 作业 运行 非常慢
Spark Streaming Job is running very slow
我在本地 运行 进行火花流作业,一批大约需要 4 到 5 分钟。有人可以建议波纹管代码可能有什么问题吗?
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid
schema = StructType([
StructField("source", StringType(), True),
StructField("temperature", FloatType(), True),
StructField("time", StringType(), True)
])
spark = SparkSession \
.builder.master("local[8]") \
.appName("poc-app") \
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)
df1 = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "poc") \
.load() \
.selectExpr("CAST(value AS STRING)")
df2 = df1.select(from_json("value", schema).alias(
"sensors")).select("sensors.*")
df3=df2.select(df2.source,df2.temperature,from_unixtime(unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes","1 minutes"), df3.source).count()
query1 = df4.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
.start()
query1.awaitTermination()
对于小批量流式处理,您通常希望减少输出分区的数量...因为每次您都在进行一些聚合(广泛转换),它将默认为磁盘上的 200 个分区,因为
spark.conf.get("spark.sql.shuffle.partitions")
尝试将此配置降低到较小的输出分区并将其放在代码的开头,以便在执行聚合时将 5 个分区输出到磁盘
spark.conf.set("spark.sql.shuffle.partitions", 5)
您还可以通过查看输出写入流目录中的文件数量以及确定聚合 df 中的分区数量来获得感觉
df3.rdd.getNumPartitions()
顺便说一句,因为您正在使用本地模式进行测试,请尝试设置为 local[8] 而不是 local[4],这样它会增加 cpu 内核的并行性(我假设您有 4 个)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid
schema = StructType([
StructField("source", StringType(), True),
StructField("temperature", FloatType(), True),
StructField("time", StringType(), True)
])
spark = SparkSession \
.builder.master("local[8]") \
.appName("poc-app") \
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)
df1 = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "poc") \
.load() \
.selectExpr("CAST(value AS STRING)")
df2 = df1.select(from_json("value", schema).alias(
"sensors")).select("sensors.*")
df3=df2.select(df2.source,df2.temperature,from_unixtime(unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes","1 minutes"), df3.source).count()
query1 = df4.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
.start()
query1.awaitTermination()
对于小批量流式处理,您通常希望减少输出分区的数量...因为每次您都在进行一些聚合(广泛转换),它将默认为磁盘上的 200 个分区,因为
spark.conf.get("spark.sql.shuffle.partitions")
尝试将此配置降低到较小的输出分区并将其放在代码的开头,以便在执行聚合时将 5 个分区输出到磁盘
spark.conf.set("spark.sql.shuffle.partitions", 5)
您还可以通过查看输出写入流目录中的文件数量以及确定聚合 df 中的分区数量来获得感觉
df3.rdd.getNumPartitions()
顺便说一句,因为您正在使用本地模式进行测试,请尝试设置为 local[8] 而不是 local[4],这样它会增加 cpu 内核的并行性(我假设您有 4 个)