Spark Structured Streaming 每批次加入 csv 文件流和速率流太多时间

Spark Structured Streaming joins csv file stream and rate stream too much time per batch

我有速率和 csv 文件流加入 rat 值和 csv 文件 ID:

def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame =  {
    val schema = StructType(
        StructField("id", LongType, nullable = false) ::
        StructField("value1", LongType, nullable = false) ::
        StructField("another", DoubleType, nullable = false) :: Nil)
  val spark: SparkSession = SparkSession
  .builder
  .master("local[1]")
  .config(new SparkConf().setIfMissing("spark.master", "local[1]")
  .set("spark.eventLog.dir", "file:///tmp/spark-events")
  ).getOrCreate()

   spark
      .readStream
      .format("csv")
      .option("header", value=true)
      .schema(schema)
      .option("delimiter", ",")
      .option("maxFilesPerTrigger", 1)
      //.option("inferSchema", value = true)
      .load(path)
  }

   val rate = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .option("numPartitions", 10)
      .load()
      .withWatermark("timestamp", "1 seconds")

    val cvsStream=readFromCSVFile(tmpPath.toString)
    val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")

    cvsStream2
      .writeStream
      .trigger(Trigger.ProcessingTime(10))
      .format("console")
      .option("truncate", "false")
      .queryName("kafkaDataGenerator")
      .start().awaitTermination(300000)

CSV 文件长 6 行,但处理一批大约需要 100 秒:

2021-10-15 23:21:29 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+-------+-----------------------+-----+
|id |value1|another|timestamp              |value|
+---+------+-------+-----------------------+-----+
|6  |2     |3.0    |2021-10-15 20:20:02.507|6    |
|5  |2     |2.0    |2021-10-15 20:20:01.507|5    |
|1  |1     |1.0    |2021-10-15 20:19:57.507|1    |
|3  |1     |3.0    |2021-10-15 20:19:59.507|3    |
|2  |1     |2.0    |2021-10-15 20:19:58.507|2    |
|4  |2     |1.0    |2021-10-15 20:20:00.507|4    |
+---+------+-------+-----------------------+-----+

如何优化连接操作以更快地处理这批次?应该不需要那么多的计算,所以看起来有一种隐藏的水印或其他什么,使批处理等待大约 100 秒。什么样的options/properties可以申请?

我建议您没有足够的数据来研究性能。为什么不将数据增加到 500,000,看看是否有问题?现在,我担心您没有 运行 足够的数据来有效地发挥系统的性能,而且启动成本没有根据数据量进行适当摊销。

是什么显着提高了性能?使用 spark.read 而不是像那样使用 spark.readStream 并将 DataFrame 保存在内存中:

val dataFrameToBeReturned = spark.read
      .format("csv")
      .schema(schema)
      .option("delimiter", ";")
      .option("maxFilesPerTrigger", 1)
      .csv("hdfs://" + hdfsLocation + homeZeppelinPrefix + "/generator/" + shortPath)
      .persist(StorageLevel.MEMORY_ONLY_SER)