生产者程序中的kafka网络处理器错误(ArrayIndexOutOfBoundsException:18)
kafka network Processor error in producer program(ArrayIndexOutOfBoundsException: 18)
我有下面的 kafka producer Api 程序,我是 kafka 本身的新手。下面的代码从 API 之一获取数据并将消息发送到 kafka 主题。
package kafka_Demo;
import java.util.Properties;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.kafka.clients.producer.*;
import java.net.URL;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class HttpBasicAuth {
public static void main(String[] args) {
try {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Json_read count = new Json_read();
URL url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&maxResults=0");
long total_ticket = count.ticketCount(url);
Alm_authentication alm = new Alm_authentication();
for (long i = 0; i <=total_ticket ; i = i + 20) {
url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&expand=changelog&startAt=" + i+"&maxResults=20");
InputStream content = (InputStream) alm.performAuth(url);
if (content != null) {
BufferedReader in = new BufferedReader(new InputStreamReader(content));
String line;
while ((line = in.readLine()) != null) {
//writer.println(line);
// System.out.println(line);
producer.send(new ProducerRecord<String, String>("test", "a11", line)).get();
}
}
producer.send(new ProducerRecord<String, String>("test", "a11", "\n"));
content.close();
}
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
虽然我在 运行 上面的代码中,但我在 kafka broker 遇到了以下错误。
ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)
我在 Google 中搜索了很多次,但没有成功。谁能帮我解决一下。
下面的错误我可以在我的日食中看到它 IDE.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
at kafka_Demo.HttpBasicAuth.main(HttpBasicAuth.java:40)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
这是由于 kafka 版本不匹配造成的。
确保 kafka-clients jar 版本和 kafka 服务器版本安装在你的机器上。
即使我的 eclipse 和 kafka 有相同的 jar,我也遇到了类似的问题,然后我找到了根本原因:我在我的机器上安装了另一个版本的 zookeeper 用于 solr 配置,并且 zookeeper lib 被添加到类路径中环境变量。所以启动 kafka-zookeeper 它指的是另一个库所以我删除了 zookeeper 的类路径并尝试使用 kafka-zookeeper ,它有效:)
我有下面的 kafka producer Api 程序,我是 kafka 本身的新手。下面的代码从 API 之一获取数据并将消息发送到 kafka 主题。
package kafka_Demo;
import java.util.Properties;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.kafka.clients.producer.*;
import java.net.URL;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class HttpBasicAuth {
public static void main(String[] args) {
try {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Json_read count = new Json_read();
URL url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&maxResults=0");
long total_ticket = count.ticketCount(url);
Alm_authentication alm = new Alm_authentication();
for (long i = 0; i <=total_ticket ; i = i + 20) {
url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&expand=changelog&startAt=" + i+"&maxResults=20");
InputStream content = (InputStream) alm.performAuth(url);
if (content != null) {
BufferedReader in = new BufferedReader(new InputStreamReader(content));
String line;
while ((line = in.readLine()) != null) {
//writer.println(line);
// System.out.println(line);
producer.send(new ProducerRecord<String, String>("test", "a11", line)).get();
}
}
producer.send(new ProducerRecord<String, String>("test", "a11", "\n"));
content.close();
}
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
虽然我在 运行 上面的代码中,但我在 kafka broker 遇到了以下错误。
ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)
我在 Google 中搜索了很多次,但没有成功。谁能帮我解决一下。 下面的错误我可以在我的日食中看到它 IDE.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
at kafka_Demo.HttpBasicAuth.main(HttpBasicAuth.java:40)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
这是由于 kafka 版本不匹配造成的。 确保 kafka-clients jar 版本和 kafka 服务器版本安装在你的机器上。
即使我的 eclipse 和 kafka 有相同的 jar,我也遇到了类似的问题,然后我找到了根本原因:我在我的机器上安装了另一个版本的 zookeeper 用于 solr 配置,并且 zookeeper lib 被添加到类路径中环境变量。所以启动 kafka-zookeeper 它指的是另一个库所以我删除了 zookeeper 的类路径并尝试使用 kafka-zookeeper ,它有效:)