在 KafkaProducer.send(消息)上,我收到 "exception Error serializing Avro message"

On KafkaProducer.send(message), I am getting "exception Error serializing Avro message"

我正在使用 Avro 生成 class。这是我在生产者中的代码看起来像

TweetInfo tweetInfo = TweetInfo.newBuilder()
                    .setTweetId(status.getId())
                    .setTweetCreatedAt(status.getCreatedAt().toString())
                    .setTweetMessage(status.getText())
                    .setUserId(user.getId())
                    .setUserCreatedAt(user.getCreatedAt().toString())
                    .setUserName(user.getName())
                    .setUserScreenName(user.getScreenName())
                    .build();

            ProducerRecord<String, TweetInfo> data = new ProducerRecord(KafkaConstants.TOPIC, tweetInfo);
            producer.send(data);

TweetInfo class 由 Avro 架构生成。当我 运行 程序时,我看到堆栈跟踪如下

    2018-12-11 01:51:58.138  WARN 16244 --- [c Dispatcher[0]] o.i.service.kafka.TweetKafkaProducer     : exception Error serializing Avro message
2018-12-11 01:51:59.162 ERROR 16244 --- [c Dispatcher[0]] i.c.k.s.client.rest.RestService          : Failed to send HTTP request to endpoint: http://localhost:8081/subjects/twitterData-value/versions

java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_152]
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_152]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_152]
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_152]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_152]
    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_152]
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[na:1.8.0_152]
    at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334) ~[na:1.8.0_152]
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309) ~[na:1.8.0_152]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:178) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) [kafka-schema-registry-client-5.0.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) [kafka-avro-serializer-5.0.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) [kafka-avro-serializer-5.0.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) [kafka-clients-2.1.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728) [kafka-clients-2.1.0.jar:na]
    at org.interview.service.kafka.TweetKafkaProducer.onStatus(TweetKafkaProducer.java:95) [classes/:na]
    at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:75) [twitter4j-stream-4.0.6.jar:4.0.6]
    at twitter4j.StatusStreamBase.run(StatusStreamBase.java:105) [twitter4j-stream-4.0.6.jar:4.0.6]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]

我有 zookeeper 和 kafka 运行ning。我还需要 运行 Schema Registry 吗?如果是,那么是否有指南可以做到这一点?我找不到。

Failed to send HTTP request to endpoint

Confluent Schema Registry Server 需要 运行ning。您可能想尝试自己访问 HTTP 端点(请参阅下面的文档)。

不确定您是如何启动它的,但是您可以 download Confluent OSS,将其解压到某处,然后在终端中,您需要导航到解压文件夹的 bin 位置和 运行 confluent start schema-registry注意:这仅适用于Linux。

或者如果你想要一个 "production deployment" 配置,需要先编辑 etc 文件夹中的 属性 文件,然后 运行 每个 Zookeeper、Kafka、和注册表使用各自的脚本。

文档:Running Schema Registry


关于评论

When I try and run the commands in article it gives error that bin is not valid command

$ bin/... 首先假定您已 cd 进入提取的 confluent-x.x.x 文件夹


顺便说一下,有 existing Kafka Connect 个项目与 Twitter API 交互。

如@cricket_007 所说,如果你在 windows 上,请尝试使用 docker。

在 link 下方的 docker compose 将 运行 kafka、zookeeper、schema registry 和 kafka rest,有了它你可以很容易地测试你的生产者。 https://github.com/confluentinc/docker-images/blob/master/examples/fullstack/docker-compose.yml

编辑:对不起我的错,这是旧回购的link,检查下面的那个,你有所有的融合平台(你可以删除你不不需要)!

https://github.com/confluentinc/cp-docker-images/blob/5.0.1-post/examples/cp-all-in-one/docker-compose.yml