Spark:将数据写入 Kafka 时如何使用自定义分区器
Spark: How to use custom partitionner when writing data to Kafka
向kafka写入数据时,可以使用名为key的列来选择分区:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
我需要手动决定分区,独立于键。是否可以手动指定分区?或者提供自定义分区程序,以便我控制选择分区的逻辑是什么?
您只需使用具有适当逻辑的自定义分区程序添加选项 kafka.partitioner.class
。
val dataStreamWriter: DataStreamWriter[Row] = ???
dataStreamWriter.option("kafka.partitioner.class", "com.example.CustomKafkaPartitioner")
向kafka写入数据时,可以使用名为key的列来选择分区:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
我需要手动决定分区,独立于键。是否可以手动指定分区?或者提供自定义分区程序,以便我控制选择分区的逻辑是什么?
您只需使用具有适当逻辑的自定义分区程序添加选项 kafka.partitioner.class
。
val dataStreamWriter: DataStreamWriter[Row] = ???
dataStreamWriter.option("kafka.partitioner.class", "com.example.CustomKafkaPartitioner")