分区数如何影响 Spark Kafka 连接?

How does number of partitions affect Spark Kafka Connections?

我正在启动 EMR 以将数据帧发布到 kafka(大约 300-400 行)。我能够发布它并且数据框有 200 个分区。在发布数据框时,我看到 kafka 集群中 CPU 的巨大峰值持续了大约 20-30 分钟。分区号是否创建 200 个连接?

或者它是否使用此处所述的 1 个连接。 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#producer-caching

示例代码

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col
val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
        "kafka.security.protocol" -> "SSL",
        "kafka.ssl.endpoint.identification.algorithm" -> "",
        "kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
        "kafka.ssl.truststore.password" -> "password",
        "kafka.ssl.keystore.type" -> "PKCS12",
        "kafka.ssl.key.password" -> "password",
        "kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
        "kafka.ssl.keystore.password" -> "password")
    )

 val df = spark
        .read
        .option("header", true)
        .option("escape", "\"")
        .csv("s3://bucket/file.csv")

 val publishToKafkaDf = df.withColumn("value", col("body"))

 publishToKafkaDf
      .selectExpr( "CAST(value AS STRING)")
      .write
      .format("kafka")
      .option("topic", "test-topic")
      .options(kafkaOptions)
      .save()

I am able to publish it and the dataframe has 200 partitions. While publishing the dataframe I see a huge spike in CPU in the kafka cluster for about 20-30 mins. Does the partition number create 200 connections?

根据 Producer Caching 提到的:

Spark initializes a Kafka producer instance and co-use across tasks for same caching key.

这告诉我,将有一个 Kafka 生产者在单个执行器上共享所有任务。 (虽然我还没有检查来源,所以我不太确定。)

换句话说,分区(即执行时的任务)的数量在可用的执行器之间共享。如果你有 10 个执行者,我的理解是会有 10 个 Kafka 生产者。


请注意该文档适用于最新的 Spark 3.0.0,而您使用的 Spark 2.3.0 基于:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0

我认为这并不重要,因为这个 Kafka-producer-per-executor 已经在早期版本中使用过。他们可能在 3.0 中改进了共享和缓存。