Spark Structured Streaming 每批次加入 csv 文件流和速率流太多时间
Spark Structured Streaming joins csv file stream and rate stream too much time per batch
我有速率和 csv 文件流加入 rat 值和 csv 文件 ID:
def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame = {
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("value1", LongType, nullable = false) ::
StructField("another", DoubleType, nullable = false) :: Nil)
val spark: SparkSession = SparkSession
.builder
.master("local[1]")
.config(new SparkConf().setIfMissing("spark.master", "local[1]")
.set("spark.eventLog.dir", "file:///tmp/spark-events")
).getOrCreate()
spark
.readStream
.format("csv")
.option("header", value=true)
.schema(schema)
.option("delimiter", ",")
.option("maxFilesPerTrigger", 1)
//.option("inferSchema", value = true)
.load(path)
}
val rate = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 10)
.load()
.withWatermark("timestamp", "1 seconds")
val cvsStream=readFromCSVFile(tmpPath.toString)
val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")
cvsStream2
.writeStream
.trigger(Trigger.ProcessingTime(10))
.format("console")
.option("truncate", "false")
.queryName("kafkaDataGenerator")
.start().awaitTermination(300000)
CSV 文件长 6 行,但处理一批大约需要 100 秒:
2021-10-15 23:21:29 WARN ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+-------+-----------------------+-----+
|id |value1|another|timestamp |value|
+---+------+-------+-----------------------+-----+
|6 |2 |3.0 |2021-10-15 20:20:02.507|6 |
|5 |2 |2.0 |2021-10-15 20:20:01.507|5 |
|1 |1 |1.0 |2021-10-15 20:19:57.507|1 |
|3 |1 |3.0 |2021-10-15 20:19:59.507|3 |
|2 |1 |2.0 |2021-10-15 20:19:58.507|2 |
|4 |2 |1.0 |2021-10-15 20:20:00.507|4 |
+---+------+-------+-----------------------+-----+
如何优化连接操作以更快地处理这批次?应该不需要那么多的计算,所以看起来有一种隐藏的水印或其他什么,使批处理等待大约 100 秒。什么样的options/properties可以申请?
我建议您没有足够的数据来研究性能。为什么不将数据增加到 500,000,看看是否有问题?现在,我担心您没有 运行 足够的数据来有效地发挥系统的性能,而且启动成本没有根据数据量进行适当摊销。
是什么显着提高了性能?使用 spark.read
而不是像那样使用 spark.readStream
并将 DataFrame
保存在内存中:
val dataFrameToBeReturned = spark.read
.format("csv")
.schema(schema)
.option("delimiter", ";")
.option("maxFilesPerTrigger", 1)
.csv("hdfs://" + hdfsLocation + homeZeppelinPrefix + "/generator/" + shortPath)
.persist(StorageLevel.MEMORY_ONLY_SER)
我有速率和 csv 文件流加入 rat 值和 csv 文件 ID:
def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame = {
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("value1", LongType, nullable = false) ::
StructField("another", DoubleType, nullable = false) :: Nil)
val spark: SparkSession = SparkSession
.builder
.master("local[1]")
.config(new SparkConf().setIfMissing("spark.master", "local[1]")
.set("spark.eventLog.dir", "file:///tmp/spark-events")
).getOrCreate()
spark
.readStream
.format("csv")
.option("header", value=true)
.schema(schema)
.option("delimiter", ",")
.option("maxFilesPerTrigger", 1)
//.option("inferSchema", value = true)
.load(path)
}
val rate = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 10)
.load()
.withWatermark("timestamp", "1 seconds")
val cvsStream=readFromCSVFile(tmpPath.toString)
val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")
cvsStream2
.writeStream
.trigger(Trigger.ProcessingTime(10))
.format("console")
.option("truncate", "false")
.queryName("kafkaDataGenerator")
.start().awaitTermination(300000)
CSV 文件长 6 行,但处理一批大约需要 100 秒:
2021-10-15 23:21:29 WARN ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+-------+-----------------------+-----+
|id |value1|another|timestamp |value|
+---+------+-------+-----------------------+-----+
|6 |2 |3.0 |2021-10-15 20:20:02.507|6 |
|5 |2 |2.0 |2021-10-15 20:20:01.507|5 |
|1 |1 |1.0 |2021-10-15 20:19:57.507|1 |
|3 |1 |3.0 |2021-10-15 20:19:59.507|3 |
|2 |1 |2.0 |2021-10-15 20:19:58.507|2 |
|4 |2 |1.0 |2021-10-15 20:20:00.507|4 |
+---+------+-------+-----------------------+-----+
如何优化连接操作以更快地处理这批次?应该不需要那么多的计算,所以看起来有一种隐藏的水印或其他什么,使批处理等待大约 100 秒。什么样的options/properties可以申请?
我建议您没有足够的数据来研究性能。为什么不将数据增加到 500,000,看看是否有问题?现在,我担心您没有 运行 足够的数据来有效地发挥系统的性能,而且启动成本没有根据数据量进行适当摊销。
是什么显着提高了性能?使用 spark.read
而不是像那样使用 spark.readStream
并将 DataFrame
保存在内存中:
val dataFrameToBeReturned = spark.read
.format("csv")
.schema(schema)
.option("delimiter", ";")
.option("maxFilesPerTrigger", 1)
.csv("hdfs://" + hdfsLocation + homeZeppelinPrefix + "/generator/" + shortPath)
.persist(StorageLevel.MEMORY_ONLY_SER)