Spark Streaming 作业 运行 非常慢

Spark Streaming Job is running very slow

我在本地 运行 进行火花流作业,一批大约需要 4 到 5 分钟。有人可以建议波纹管代码可能有什么问题吗?

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid

schema = StructType([
    StructField("source", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("time", StringType(), True)
])

spark = SparkSession \
    .builder.master("local[8]") \
    .appName("poc-app") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 5)    

df1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "poc") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

df2 = df1.select(from_json("value", schema).alias(
    "sensors")).select("sensors.*")

df3=df2.select(df2.source,df2.temperature,from_unixtime(unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes","1 minutes"), df3.source).count()

query1 = df4.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
.start() 

query1.awaitTermination()

对于小批量流式处理,您通常希望减少输出分区的数量...因为每次您都在进行一些聚合(广泛转换),它将默认为磁盘上的 200 个分区,因为

spark.conf.get("spark.sql.shuffle.partitions")

尝试将此配置降低到较小的输出分区并将其放在代码的开头,以便在执行聚合时将 5 个分区输出到磁盘

spark.conf.set("spark.sql.shuffle.partitions", 5)

您还可以通过查看输出写入流目录中的文件数量以及确定聚合 df 中的分区数量来获得感觉

df3.rdd.getNumPartitions()

顺便说一句,因为您正在使用本地模式进行测试,请尝试设置为 local[8] 而不是 local[4],这样它会增加 cpu 内核的并行性(我假设您有 4 个)