Kafka - 在 ASP.Net Core 中实例化生产者 Class 时出错
Kafka - Error in Instantiating Producer Class in ASP.Net Core
我正在使用Confluent.Kafka
string kafkaEndpoint = "127.0.0.1:9092";
string kafkaTopic = "testtopic";
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
}
}
我收到以下编译时错误:
Producer.ProduceAsync(TopicPartition, Message)' is inaccessible due to its protection level
像这样使用ProducerBuilder
:
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();
显示错误:
Cannot convert from 'Confluent.Kafka.Message<Confluent.Kafka.Null, MyClass>' to 'Confluent.Kafka.Message<Confluent.Kafka.Null, string>
在Confluent.Kafka
nuget v1.0.1 中,Producer
class 是内部的class,即不可访问。看起来您需要使用 ProducerBuilder
来代替,例如如:
var producerConfig = new Dictionary<string, string> { { "bootstrap.servers", kafkaEndpoint } };
using (var producer = new ProducerBuilder<Null, string>(producerConfig)
.SetKeySerializer(Serializers.Null)
.SetValueSerializer(Serializers.Utf8)
.Build())
{
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, string>{ Value = message}).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
}
}
看起来任意 class 的实例可以发送为(用目标 class 替换 MyClass
):
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();
我正在使用Confluent.Kafka
string kafkaEndpoint = "127.0.0.1:9092";
string kafkaTopic = "testtopic";
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
}
}
我收到以下编译时错误:
Producer.ProduceAsync(TopicPartition, Message)' is inaccessible due to its protection level
像这样使用ProducerBuilder
:
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();
显示错误:
Cannot convert from 'Confluent.Kafka.Message<Confluent.Kafka.Null, MyClass>' to 'Confluent.Kafka.Message<Confluent.Kafka.Null, string>
在Confluent.Kafka
nuget v1.0.1 中,Producer
class 是内部的class,即不可访问。看起来您需要使用 ProducerBuilder
来代替,例如如:
var producerConfig = new Dictionary<string, string> { { "bootstrap.servers", kafkaEndpoint } };
using (var producer = new ProducerBuilder<Null, string>(producerConfig)
.SetKeySerializer(Serializers.Null)
.SetValueSerializer(Serializers.Utf8)
.Build())
{
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, string>{ Value = message}).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
}
}
看起来任意 class 的实例可以发送为(用目标 class 替换 MyClass
):
var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();