如何降低Kafka Producer的写入速度?

How to slow down the write speed of Kafka Producer?

我是这样用spark写数据到kafka的

df.write(). format("kafka"). save()

能否控制写入kafka的速度,避免对kafka造成压力? 有没有一些选项可以帮助降低速度?

我认为将 linger.ms 设置为非零值会有所帮助。因为它控制在发送当前批次之前等待其他消息的时间量。代码可能如下所示

df.write.format("kafka").option("linger.ms", "100").save()

但这真的取决于很多事情。如果你的 Kafka 'big' 足够并且配置正确,我不会太担心速度。毕竟kafka就是为了应对这种情况(流量高峰)而设计的。

通常情况下,结构化流式处理会默认尝试尽可能快地处理数据。每个源中都有选项可以控制处理速率,例如 File 源中的 maxFilesPerTrigger 和 Kafka 源中的 maxOffsetsPerTrigger。

val streamingETLQuery = cloudtrailEvents
  .withColumn("date", $"timestamp".cast("date") // derive the date
  .writeStream
  .trigger(ProcessingTime("10 seconds")) // check for files every 10s
  .format("parquet") // write as Parquet partitioned by date
  .partitionBy("date")
  .option("path", "/cloudtrail")
  .option("checkpointLocation", "/cloudtrail.checkpoint/")
  .start()

val df = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("text-logs")

阅读以下链接了解更多详情:

https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html