构建kafka消费者失败

Failed to construct kafka consumer

关于这个主题有很多答案,但没有任何效果。

我正在尝试执行以下流处理器。

object simplestream extends App {

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = { //ToDo - Move these to config
      val settings = new Properties
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "example11")
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      // Specify default (de)serializers for record keys and for record values.
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings
    }

    val users = builder.stream("tt2")

    users.print()
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }
}

依赖关系:

   //kafka
  "org.apache.kafka" % "kafka-streams" % "0.10.2.0",
  "org.apache.kafka" % "kafka-clients" % "0.10.2.0"

错误:

  [error] (run-main-1) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
    at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
    at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:323)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:349)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:272)
    at kafka.simplestream$.runStream(simplestream.scala:36)
    at kafka.simplestream$.delayedEndpoint$kafka$simplestream(simplestream.scala:40)
    at kafka.simplestream$delayedInit$body.apply(simplestream.scala:12)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at kafka.simplestream$.main(simplestream.scala:12)
    at kafka.simplestream.main(simplestream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.update(Lorg/apache/kafka/common/Cluster;J)V

我试过不同的客户端版本,但没有成功。我使用的是 kafka 0.10.2.0 版本。我在 zookeeper 中也遇到以下错误。

[2017-08-18 13:08:10,260] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

不确定究竟是什么原因造成的。不过我可以 consumer/produce 就好了。

java.lang.NoSuchMethodError - 当您的类路径中有多个版本的客户端 jar 时,会发生此错误。检查一次类路径。

Zookeeper 抛出的 KeeperException 不是问题,它只是创建 Zookeeper 中不存在的节点/文件夹。