Spark Streaming - 4 核和 16 核的处理时间相同。为什么?
Spark Streaming - Same processing time for 4 cores and 16 cores. Why?
场景:我正在使用 Spark Streaming 进行一些测试。大约 100 条记录的文件每 25 秒出现一次。
问题:在程序中使用 local[*] 处理 4 核 pc 平均需要 23 秒。当我将相同的应用程序部署到具有 16 个内核的服务器时,我期望处理时间有所改善。但是,我看到它在 16 个内核中仍然花费相同的时间(还检查了 ubuntu 中的 cpu 用法,并且 cpu 正在被充分利用)。所有的配置都是spark默认提供的。
问题:
处理时间不应该随着可用于流式处理作业的核心数量的增加而减少吗?
代码:
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
.set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
jsonBuilder.append("{")
val singleRowArray = row.split(",")
(header, singleRowArray).zipped
.foreach { (x, y) =>
jsonBuilder.append(convertToStringBasedOnDataType(x, y))
// GEO Hash logic here
if (x.equals("GPSLat") || x.equals("Lat")) {
lattitude = y.toDouble
}
else if (x.equals("GPSLon") || x.equals("Lon")) {
longitude = y.toDouble
if (x.equals("Lon")) {
// This section is used to convert GPS Look Up to GPS LookUP with Hash
jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
}
else {
val selectedRow = broadcastTable.value
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
}
else {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
}
}
}
}
jsonBuilder.setLength(jsonBuilder.length - 1)
jsonBuilder.append("},")
}
sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")
听起来您只使用了一个线程,如果是这样的话,应用程序是在 4 核还是 16 核的机器上运行都无关紧要。
听起来好像有 1 个文件进来,1 个文件是 1 个 RDD 分区,有 100 行。您遍历该 RDD 中的行并附加 jsonBuilder
。最后你调用 repartition(1)
这将使文件的写入成为单线程。
您可以在获取文件后将数据集修复为 12 个 RDD 分区,以确保其他线程在这些行上工作。但除非我遗漏了什么,否则你很幸运,这不会发生。如果两个线程同时调用 jsonBuilder.append("{")
会发生什么?他们不会创建无效的JSON。我可能在这里遗漏了一些东西。
您可以通过添加如下日志记录来测试我对您的应用程序的单线程性的看法是否正确:
scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21
scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-40 => 1
Executor task launch worker-40 => 2
Executor task launch worker-40 => 3
Executor task launch worker-40 => 4
Executor task launch worker-40 => 5
Executor task launch worker-40 => 6
Executor task launch worker-40 => 7
Executor task launch worker-40 => 8
Executor task launch worker-40 => 9
Executor task launch worker-40 => 10
scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21
scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-109 => 1
Executor task launch worker-108 => 2
Executor task launch worker-95 => 3
Executor task launch worker-95 => 4
Executor task launch worker-109 => 5
Executor task launch worker-108 => 6
Executor task launch worker-108 => 7
Executor task launch worker-95 => 8
Executor task launch worker-109 => 9
Executor task launch worker-108 => 10
场景:我正在使用 Spark Streaming 进行一些测试。大约 100 条记录的文件每 25 秒出现一次。
问题:在程序中使用 local[*] 处理 4 核 pc 平均需要 23 秒。当我将相同的应用程序部署到具有 16 个内核的服务器时,我期望处理时间有所改善。但是,我看到它在 16 个内核中仍然花费相同的时间(还检查了 ubuntu 中的 cpu 用法,并且 cpu 正在被充分利用)。所有的配置都是spark默认提供的。
问题: 处理时间不应该随着可用于流式处理作业的核心数量的增加而减少吗?
代码:
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
.set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
jsonBuilder.append("{")
val singleRowArray = row.split(",")
(header, singleRowArray).zipped
.foreach { (x, y) =>
jsonBuilder.append(convertToStringBasedOnDataType(x, y))
// GEO Hash logic here
if (x.equals("GPSLat") || x.equals("Lat")) {
lattitude = y.toDouble
}
else if (x.equals("GPSLon") || x.equals("Lon")) {
longitude = y.toDouble
if (x.equals("Lon")) {
// This section is used to convert GPS Look Up to GPS LookUP with Hash
jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
}
else {
val selectedRow = broadcastTable.value
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
}
else {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
}
}
}
}
jsonBuilder.setLength(jsonBuilder.length - 1)
jsonBuilder.append("},")
}
sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")
听起来您只使用了一个线程,如果是这样的话,应用程序是在 4 核还是 16 核的机器上运行都无关紧要。
听起来好像有 1 个文件进来,1 个文件是 1 个 RDD 分区,有 100 行。您遍历该 RDD 中的行并附加 jsonBuilder
。最后你调用 repartition(1)
这将使文件的写入成为单线程。
您可以在获取文件后将数据集修复为 12 个 RDD 分区,以确保其他线程在这些行上工作。但除非我遗漏了什么,否则你很幸运,这不会发生。如果两个线程同时调用 jsonBuilder.append("{")
会发生什么?他们不会创建无效的JSON。我可能在这里遗漏了一些东西。
您可以通过添加如下日志记录来测试我对您的应用程序的单线程性的看法是否正确:
scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21
scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-40 => 1
Executor task launch worker-40 => 2
Executor task launch worker-40 => 3
Executor task launch worker-40 => 4
Executor task launch worker-40 => 5
Executor task launch worker-40 => 6
Executor task launch worker-40 => 7
Executor task launch worker-40 => 8
Executor task launch worker-40 => 9
Executor task launch worker-40 => 10
scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21
scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-109 => 1
Executor task launch worker-108 => 2
Executor task launch worker-95 => 3
Executor task launch worker-95 => 4
Executor task launch worker-109 => 5
Executor task launch worker-108 => 6
Executor task launch worker-108 => 7
Executor task launch worker-95 => 8
Executor task launch worker-109 => 9
Executor task launch worker-108 => 10