火花 Repeatable/Deterministic 结果
Spark Repeatable/Deterministic Results
我运行正在使用下面的 Spark 代码(基本上创建为 MVE),它执行以下操作:
- 读取parquet并限制
- 分区依据
- 加入
- 过滤器
我很难理解为什么每次我 运行 应用程序时,我在 joined
数据框中得到不同数量的行,即上面第 3 阶段之后的数据框。为什么会这样?
我认为不应该发生的原因是 limit
是确定性的,所以每次相同的行都应该在分区数据框中,尽管顺序不同。在加入中,我加入了分区完成的领域。我希望在一个分区中包含每对组合,但我认为这应该每次都等于相同的数字。
def main(args: Array[String]) {
val maxRows = args(0)
val spark = SparkSession.builder.getOrCreate()
val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")
val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
val partitionDf = data.withColumn("row", row_number().over(windowSpec))
partitionDf.persist(StorageLevel.MEMORY_ONLY)
logger.debug(s"${partitionDf.count()} rows in partitioned data")
val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")
val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
logger.debug(s"Rows in joined dataframe ${joined.count()}")
val filtered = joined.filter(col("row_orig") < col("row_dest"))
logger.debug(s"Rows in filtered dataframe ${filtered.count()}")
}
- 如果您启动一个新的应用程序,可能会有基础数据更改。
- 否则,使用 Spark SQL 就像在 RDBMS 上使用 ANSI SQL 一样,在不使用 ORDER BY 时无法保证数据的排序。因此,您不能假设在不同的 Executor 分配情况下第二次处理将相同(没有 ordering/sorting),等等
我运行正在使用下面的 Spark 代码(基本上创建为 MVE),它执行以下操作:
- 读取parquet并限制
- 分区依据
- 加入
- 过滤器
我很难理解为什么每次我 运行 应用程序时,我在 joined
数据框中得到不同数量的行,即上面第 3 阶段之后的数据框。为什么会这样?
我认为不应该发生的原因是 limit
是确定性的,所以每次相同的行都应该在分区数据框中,尽管顺序不同。在加入中,我加入了分区完成的领域。我希望在一个分区中包含每对组合,但我认为这应该每次都等于相同的数字。
def main(args: Array[String]) {
val maxRows = args(0)
val spark = SparkSession.builder.getOrCreate()
val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")
val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
val partitionDf = data.withColumn("row", row_number().over(windowSpec))
partitionDf.persist(StorageLevel.MEMORY_ONLY)
logger.debug(s"${partitionDf.count()} rows in partitioned data")
val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")
val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
logger.debug(s"Rows in joined dataframe ${joined.count()}")
val filtered = joined.filter(col("row_orig") < col("row_dest"))
logger.debug(s"Rows in filtered dataframe ${filtered.count()}")
}
- 如果您启动一个新的应用程序,可能会有基础数据更改。
- 否则,使用 Spark SQL 就像在 RDBMS 上使用 ANSI SQL 一样,在不使用 ORDER BY 时无法保证数据的排序。因此,您不能假设在不同的 Executor 分配情况下第二次处理将相同(没有 ordering/sorting),等等