为 Kafka 编译自定义生产者
Compiling a custom producer for Kafka
我已经从 https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz 下载了 Kafka,并使用 VM 在我的机器上设置了一个 kafka 集群。集群工作正常——它是使用 kafka 包提供的控制台生产者和消费者进行测试的。
现在,我已经为 Kafka 实现了自定义 Producer class。但是我不知道如何编译这个 class 以及依赖项是什么。
问题
有人可以解释我需要如何为 Producer 获取依赖项,构建 class 和 运行 吗?
我需要 sbt 来构建它吗?我找不到任何在线资源来清楚地解释如何构建自定义 kafka 生产者 class.
以下是 Producer 中导入的包 class:
org.apache.kafka.clients.producer.Callback;
org.apache.kafka.clients.producer.KafkaProducer;
org.apache.kafka.clients.producer.ProducerConfig;
org.apache.kafka.clients.producer.ProducerRecord;
org.apache.kafka.clients.producer.RecordMetadata;
org.apache.kafka.common.record.Records
提前致谢
我开发了一个自定义的Kafka生产者,作为一个Maven项目,我使用的依赖是:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
我使用的进口:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
我的生产者消息发送代码片段:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, zkConnection);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
byte[] byteData = null;
File myInputFile = new File(...);
try (InputStream inputStream = new FileInputStream(myInputFile)) {
byteData = IOUtils.toByteArray(inputStream);
}
try (KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props)) {
producer.send(new ProducerRecord<String, byte[]>(topic, byteData));
}
我已经从 https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz 下载了 Kafka,并使用 VM 在我的机器上设置了一个 kafka 集群。集群工作正常——它是使用 kafka 包提供的控制台生产者和消费者进行测试的。
现在,我已经为 Kafka 实现了自定义 Producer class。但是我不知道如何编译这个 class 以及依赖项是什么。
问题
有人可以解释我需要如何为 Producer 获取依赖项,构建 class 和 运行 吗?
我需要 sbt 来构建它吗?我找不到任何在线资源来清楚地解释如何构建自定义 kafka 生产者 class.
以下是 Producer 中导入的包 class:
org.apache.kafka.clients.producer.Callback;
org.apache.kafka.clients.producer.KafkaProducer;
org.apache.kafka.clients.producer.ProducerConfig;
org.apache.kafka.clients.producer.ProducerRecord;
org.apache.kafka.clients.producer.RecordMetadata;
org.apache.kafka.common.record.Records
提前致谢
我开发了一个自定义的Kafka生产者,作为一个Maven项目,我使用的依赖是:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
我使用的进口:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
我的生产者消息发送代码片段:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, zkConnection);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
byte[] byteData = null;
File myInputFile = new File(...);
try (InputStream inputStream = new FileInputStream(myInputFile)) {
byteData = IOUtils.toByteArray(inputStream);
}
try (KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props)) {
producer.send(new ProducerRecord<String, byte[]>(topic, byteData));
}