如何通过 KafkaAdminClient 删除主题配置
How to delete a topic configuration via KafkaAdminClient
我想删除之前被覆盖的主题的配置(将其重置为默认值)。这可以通过提供的脚本
$> ./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics \
--entity-name test --delete-config my.overridden.config
有没有办法使用 kafka-clients-1.1.1.jar 中提供的 KafkaAdminClient 来做到这一点?
我刚找到方法 org.apache.kafka.clients.admin.KafkaAdminClient.alterConfigs(Map<ConfigResource, Config>, AlterConfigsOptions)
,但是当我使用设置为 null 的配置值调用它时,我在服务器上得到 NullPointerException:
[2018-07-31 11:24:01,658] ERROR [Admin Manager on Broker 0]: Error processing alter configs request for resource Resource(type=TOPIC, name='test'}, config org.apache.kafka.common.requests.AlterConfigsRequest$Config@5d4fef59 (kafka.server.AdminManager)
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:459)
at java.util.Properties.setProperty(Properties.java:166)
at kafka.server.AdminManager$$anonfun$alterConfigs$$anonfun$apply.apply(AdminManager.scala:357)
at kafka.server.AdminManager$$anonfun$alterConfigs$$anonfun$apply.apply(AdminManager.scala:356)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.AdminManager$$anonfun$alterConfigs.apply(AdminManager.scala:356)
at kafka.server.AdminManager$$anonfun$alterConfigs.apply(AdminManager.scala:339)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.AdminManager.alterConfigs(AdminManager.scala:339)
at kafka.server.KafkaApis.handleAlterConfigsRequest(KafkaApis.scala:1987)
at kafka.server.KafkaApis.handle(KafkaApis.scala:136)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
空列表也不行。
我正在使用 2.11-1.1.0 版本的 Kafka。
Kafka admin jar 中提供的很多功能都是 API 而不是直接的脚本功能。您可以在 java 程序中使用 zookeeper AdminUtils 更改主题的配置,如下所示。向函数发送一个空 属性 对象以清除现有属性。
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public static void changeConfig(String topic) {
ZkClient zkClient = new ZkClient("your_zkHost", 5000, 5000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("your_zkHost"), false);
Properties prop = new Properties();
prop.setProperty("retention.ms", "3600000");
AdminUtils.changeTopicConfig(zkUtils, topic, prop);
}
如果你经常需要这个功能,你可以包含一个文件reader来获取新的配置并打包成一个jar以便于执行。
我想删除之前被覆盖的主题的配置(将其重置为默认值)。这可以通过提供的脚本
$> ./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics \
--entity-name test --delete-config my.overridden.config
有没有办法使用 kafka-clients-1.1.1.jar 中提供的 KafkaAdminClient 来做到这一点?
我刚找到方法 org.apache.kafka.clients.admin.KafkaAdminClient.alterConfigs(Map<ConfigResource, Config>, AlterConfigsOptions)
,但是当我使用设置为 null 的配置值调用它时,我在服务器上得到 NullPointerException:
[2018-07-31 11:24:01,658] ERROR [Admin Manager on Broker 0]: Error processing alter configs request for resource Resource(type=TOPIC, name='test'}, config org.apache.kafka.common.requests.AlterConfigsRequest$Config@5d4fef59 (kafka.server.AdminManager)
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:459)
at java.util.Properties.setProperty(Properties.java:166)
at kafka.server.AdminManager$$anonfun$alterConfigs$$anonfun$apply.apply(AdminManager.scala:357)
at kafka.server.AdminManager$$anonfun$alterConfigs$$anonfun$apply.apply(AdminManager.scala:356)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.AdminManager$$anonfun$alterConfigs.apply(AdminManager.scala:356)
at kafka.server.AdminManager$$anonfun$alterConfigs.apply(AdminManager.scala:339)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.AdminManager.alterConfigs(AdminManager.scala:339)
at kafka.server.KafkaApis.handleAlterConfigsRequest(KafkaApis.scala:1987)
at kafka.server.KafkaApis.handle(KafkaApis.scala:136)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
空列表也不行。
我正在使用 2.11-1.1.0 版本的 Kafka。
Kafka admin jar 中提供的很多功能都是 API 而不是直接的脚本功能。您可以在 java 程序中使用 zookeeper AdminUtils 更改主题的配置,如下所示。向函数发送一个空 属性 对象以清除现有属性。
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public static void changeConfig(String topic) {
ZkClient zkClient = new ZkClient("your_zkHost", 5000, 5000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection("your_zkHost"), false);
Properties prop = new Properties();
prop.setProperty("retention.ms", "3600000");
AdminUtils.changeTopicConfig(zkUtils, topic, prop);
}
如果你经常需要这个功能,你可以包含一个文件reader来获取新的配置并打包成一个jar以便于执行。