为什么 apache spark 中的这两个阶段计算的是同一件事?
Why those two stages in apache spark are computing same thing?
我是 spark 的新手,我有两个很长的 运行 阶段,它们在做几乎相同的事情。下面是我的伪代码。
var metaData = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(csvFile)
val met = broadcast(metaData.dropDuplicates(Seq("col1")))
val accessLogs = sc.textFile(logFile).filter(line => regex.pattern.matcher(line).matches).map(line => LogParser.parseLogLine(line)).toDF()
val joinOutput = accessLogs.join(met,accessLogs("col1") === met("col1"),"left_outer")
val uniqueDfNames2 = Seq("col0", "col1", "col2", "col3","col4")
val sparseFilter = joinOutput
.filter(joinOutput.col("col1").isNotNull)
.filter(joinOutput.col("col2").isNotNull)
.flatMap(row=>ListParser.parseLogLine(row))
sparseFilter.cache()
val uniqueCount = sparseFilter
.filter{r=>r.col0 != null && r.col0 != "" }
.map{
case(KeyValParse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4,col5),1)
}
.distinct().cache()
.map {case ((col0,col1,col2,col3,col4),count) => ((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map {case ((col0,col1,col2,col3,col4),count) => (col0,col1,col2,col3,col4,count)
}
.toDF(uniqueDfNames: _*).cache()
val totalCount = sparseFilter
.map{
case(Parse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map{
case ((col0,col1,col2,col3,col4),totcount) => (col0,col1,col2,col3,col4,totcount)
}
.toDF(uniqueDfNames2: _*)
.join(uniqueCount,Seq("col0", "col1", "col2", "col3"),"left")
.select($"col0",$"col1",$"col2",$"col3",$"unicount",$"totcount")
.orderBy($"unicount".desc)
.toDF(totalDfNames: _*)
totalCount
.select("*")
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", "|")
.save(countPath)
我在这里要做的是根据一些参数从日志中生成唯一和总计数。
一切正常,但有两个长 运行 阶段共享几乎相同的 DAG。
下面是两个阶段的镜头。
请查看下面给出的两个阶段的屏幕截图。
直到平面图任务,他们都做同样的事情。
为什么这些不合并为一个阶段?
为什么第 11 阶段 re-read 再次文件并再次进行所有计算是我无法猜测的?
对于具有 10 个执行程序(7 个内核,15Gb RAM)的 20Gb 数据,它需要将近 30 分钟才能完成,但我觉得这可以减少到相当短的时间。
如有任何指导,我们将不胜感激。
PS:- 对不起我的图片编辑技巧:)
RDD 在操作中第一次计算时被缓存。您代码中的第一个操作是 "distinct",即缓存 "sparseFilter" RDD 时。所以第一个缓存操作可能对后续阶段没有用。第一阶段的输出是一个不同的 RDD,但稍后您指的是 sparseFilter。所以Spark不得不重新计算RDD。
我认为逻辑可以稍微改变一下。如果我理解正确的话,对于 totalCount 和 uniqueCount,代码使用相同的列集(col0、col1、col2、col3、col4)。那么在totalCount的计算中,在reduceByKey之后,简单的count应该给uniqueCount吧?可以通过这种方式避免额外的 distinct、reduceByKey、join 等。
我是 spark 的新手,我有两个很长的 运行 阶段,它们在做几乎相同的事情。下面是我的伪代码。
var metaData = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(csvFile)
val met = broadcast(metaData.dropDuplicates(Seq("col1")))
val accessLogs = sc.textFile(logFile).filter(line => regex.pattern.matcher(line).matches).map(line => LogParser.parseLogLine(line)).toDF()
val joinOutput = accessLogs.join(met,accessLogs("col1") === met("col1"),"left_outer")
val uniqueDfNames2 = Seq("col0", "col1", "col2", "col3","col4")
val sparseFilter = joinOutput
.filter(joinOutput.col("col1").isNotNull)
.filter(joinOutput.col("col2").isNotNull)
.flatMap(row=>ListParser.parseLogLine(row))
sparseFilter.cache()
val uniqueCount = sparseFilter
.filter{r=>r.col0 != null && r.col0 != "" }
.map{
case(KeyValParse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4,col5),1)
}
.distinct().cache()
.map {case ((col0,col1,col2,col3,col4),count) => ((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map {case ((col0,col1,col2,col3,col4),count) => (col0,col1,col2,col3,col4,count)
}
.toDF(uniqueDfNames: _*).cache()
val totalCount = sparseFilter
.map{
case(Parse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4),1)
}
.reduceByKey(_+_)
.map{
case ((col0,col1,col2,col3,col4),totcount) => (col0,col1,col2,col3,col4,totcount)
}
.toDF(uniqueDfNames2: _*)
.join(uniqueCount,Seq("col0", "col1", "col2", "col3"),"left")
.select($"col0",$"col1",$"col2",$"col3",$"unicount",$"totcount")
.orderBy($"unicount".desc)
.toDF(totalDfNames: _*)
totalCount
.select("*")
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", "|")
.save(countPath)
我在这里要做的是根据一些参数从日志中生成唯一和总计数。
一切正常,但有两个长 运行 阶段共享几乎相同的 DAG。
下面是两个阶段的镜头。
请查看下面给出的两个阶段的屏幕截图。
直到平面图任务,他们都做同样的事情。 为什么这些不合并为一个阶段? 为什么第 11 阶段 re-read 再次文件并再次进行所有计算是我无法猜测的?
对于具有 10 个执行程序(7 个内核,15Gb RAM)的 20Gb 数据,它需要将近 30 分钟才能完成,但我觉得这可以减少到相当短的时间。
如有任何指导,我们将不胜感激。
PS:- 对不起我的图片编辑技巧:)
RDD 在操作中第一次计算时被缓存。您代码中的第一个操作是 "distinct",即缓存 "sparseFilter" RDD 时。所以第一个缓存操作可能对后续阶段没有用。第一阶段的输出是一个不同的 RDD,但稍后您指的是 sparseFilter。所以Spark不得不重新计算RDD。
我认为逻辑可以稍微改变一下。如果我理解正确的话,对于 totalCount 和 uniqueCount,代码使用相同的列集(col0、col1、col2、col3、col4)。那么在totalCount的计算中,在reduceByKey之后,简单的count应该给uniqueCount吧?可以通过这种方式避免额外的 distinct、reduceByKey、join 等。