如何用socket数据源优化流式聚合?

How to optimize streaming aggregation with socket data source?

我在 4 CPU 个内核和 8 个线程上使用 Spark 2.4.0 和 Scala 2.11。

我编写了以下应用程序:

package demos.spark

object WordCounter {

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    import spark.implicits._
    spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .flatMap(_.split("\W+"))
      .groupBy("value")
      .count
      .writeStream
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination
  }
}

local[1] 申请的处理时间约为 60 秒。对于 local[8],它下降到 ~15 秒,这是我得到的最小值。

我总是通过套接字发送一两个句子作为输入。

这是预期的行为吗?如何优化应用程序使其具有 1 秒的处理时间?

编辑: 在这个问题上花了很长时间,终于找到了解决方案。问题在于 Spark 默认使用的分区太多(几百个)。添加 spark.sql.shuffle.partitions 选项设置为 8(我机器上的核心数)后,数据处理的持续时间已减少到 300-400 毫秒

val spark = SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.shuffle.partitions", 8)
  .getOrCreate

我还不知道,这个数字是否应该保持不变,如果 Spark 应用程序 运行 在可能发生变化的基础设施(Spark、Kubernetes、AWS、自动缩放)上会怎么样?

4 CPU cores and 8 threads.

使用 local[*],Spark 将使用与核心数一样多的处理线程,即 4 个。如果这 8 个线程是虚拟核心,Spark 将看到 8 个“CPU 核心”,因此 8 是处理的最大线程数。

这正是您的测试所证明的,即

For local[8] it drops to ~15 seconds and that is the minimal value I have ever got.

Is it an expected behaviour?

是的,除非您更改处理逻辑,即结构化查询本身,否则几乎不可能赶上时间。这就是我通常所说的考虑算法的地方(根据要处理的数据可能会有所不同)。您受到 CPU 可用内核数量的限制。

How to optimize the application to have a 1-second processing time?

更改您的结构化查询(“算法”)或它在幕后的工作方式。

以下操作为处理逻辑:

.flatMap(_.split("\W+"))
.groupBy("value")
.count

flatMap 很便宜,而且可以和 CPU 个核心一样快。你对此无能为力。

您还使用流式聚合 groupBy 后跟 count 来更改执行所需的任务数(在您的情况下,它是从 8 到默认的洗牌分区数,即 200).

您可以计算在 8 个内核上 运行 200 个任务所需的 CPU 个滴答数,您将需要那么多时间来计算结果。

The problem was in too many partisions (few hundreds) that were default used by Spark. After adding spark.sql.shuffle.partitions option set to 8 (number of cores on my machine), the duration of data processing have been declined to 300-400 ms

当然,这对这种特殊情况有所帮助,如果这是您可能拥有的唯一硬件,那也没关系。大功告成。

其他核心数量可能更高的环境呢?

if this number should be constant or not, what if Spark application will be running on infrastructure that could be changing (Spark, Kubernetes, AWS, autoscaling)?

这是最难回答的问题。欢迎来到 Apache Spark 非常动态/高度可配置的世界。影响最终结果的因素太多,通常你所拥有的就是你应该得到的,或者你开始调整 many configuration options 并且你将不得不花费数小时或数周来确定最佳配置应该是什么。考虑您的流式查询将处理的不同数据(数据形状、数量和速度)。它增加了混乱。

戴着咨询的帽子,在某些时候你将不得不决定应用程序性能是否足够好,或者你将花费数周时间希望你能做得比你已经取得的成就更好(并且有人必须付出代价为此)。

if this number should be constant or not

如果你知道你将要处理的所有数据,那么你就可以做出如此艰难的假设。

一般情况下不应该,这就是 Spark 为您提供 Adaptive Query Execution (video) 的原因。