使用 Structured Streaming with Spark 每个查询需要更多时间

Each query takes more time using Structured Streaming with Spark

我正在使用 Spark 2.3.0、Scala 2.11.8 和 Kafka,我正在尝试将来自 Kafka 的所有消息和结构化流写入 parquet 文件,但对于每个查询,我的实现执行的总时间为每一个都增加很多Spark Stages Image。 我想知道为什么会发生这种情况,我尝试了不同的可能触发器(继续、0 秒、1 秒、10 秒、10 分钟等),但我总是得到相同的行为。我的代码有这样的结构:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, SparkSession}
import com.name.proto.ProtoMessages
import java.io._
import java.text.{DateFormat, SimpleDateFormat}
import java.util.Date
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

object StructuredStreaming {

  def message_proto(value:Array[Byte]): Map[String, String] = {     

    try {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val impression_proto = ProtoMessages.TrackingRequest.parseFrom(value)

      val json = Map(
       "id_req" -> (impression_proto.getIdReq().toString),
       "ts_imp_request" -> (impression_proto.getTsRequest().toString),
       "is_after" -> (impression_proto.getIsAfter().toString),
       "type" -> (impression_proto.getType().toString)
      )    
      return json

    }catch{
      case e:Exception=>
        val pw = new PrintWriter(new File("/home/data/log.log" ))
        pw.write(e.toString)
        pw.close()

        return Map("error" -> "error")       
    }
  }

  def main(args: Array[String]){

    val proto_impressions_udf = udf(message_proto _)
    val spark = SparkSession.builder.appName("Structured Streaming ").getOrCreate()
    //fetchOffset.numRetries, fetchOffset.retryIntervalMs
    val stream = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "ip:9092")
      .option("subscribe", "ssp.impressions")
      .option("startingOffsets", "latest")
      .option("max.poll.records", "1000000")
      .option("auto.commit.interval.ms", "100000")
      .option("session.timeout.ms", "10000")
      .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      .option("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
      .option("failOnDataLoss", "false")
      .option("latestFirst", "true")
      .load()

    try{
      val query = stream.select(col("value").cast("string"))
        .select(proto_impressions_udf(col("value")) as "value_udf")
        .select(col("value_udf")("id_req").as("id_req"), col("value_udf")("is_after").as("is_after"),
          date_format(col("value_udf")("ts_request"), "yyyy").as("date").as("year"),
          date_format(col("value_udf")("ts_request"), "MM").as("date").as("month"),
          date_format(col("value_udf")("ts_request"), "dd").as("date").as("day"),
          date_format(col("value_udf")("ts_request"), "HH").as("date").as("hour"))
      val query2 = query.writeStream.format("parquet")
                        .option("checkpointLocation", "/home/data/impressions/checkpoint")
                        .option("path", "/home/data/impressions")
                        .outputMode(OutputMode.Append())
                        .partitionBy("year", "month", "day", "hour")
                        .trigger(Trigger.ProcessingTime("1 seconds"))
                        .start()           
    }catch{    
      case e:Exception=>
        val pw = new PrintWriter(new File("/home/data/log.log" ))
        pw.write(e.toString)
        pw.close()    
    }    
  }
}

我附上了来自 Spark 的其他图片 UI:

你的问题与批次有关,你需要为每个批次定义一个好的处理时间,这取决于你的集群处理能力。此外,解决每个批次的时间取决于您是否收到所有不为空的字段,因为如果您收到大量空字段,则处理该批次的过程将花费更少的时间。