如何将 .Net Class 序列化为 Avro.Generic.GenericRecord 以发布到 kafka 主题?
how to Serialize .Net Class to Avro.Generic.GenericRecord for publishing into kafka topic?
我正在尝试寻找 way/helper 到 convert.Net Class 到 Avro.Generic.GenericRecord 。目前,我正在手动将字段名和字段值添加到通用记录中。是否有 serializer/converter 可用于将对象转换为通用记录并发布到 kafka 主题。
class Plant
{
public long Id { get; set; }
public string Name { get; set; }
public List<PlantProperties> PlantProperties{ get; set; }
}
class PlantProperties
{
public long Leaves{ get; set; }
public string Color{ get; set; }
}
求推荐。
假设您使用的是 Confluent Schema Regsitry,您可以使用他们的 .NET 客户端1
https://github.com/confluentinc/confluent-kafka-dotnet
从示例文件夹中复制
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 0;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");
producer
.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
}
cts.Cancel();
}
根据您的情况,相应地更新 record.Add
使用
However,既然你有一个class,因此,你应该尝试使用SpecificRecord,而不是序列化回来通过 GenericRecord 在 Avro 和 .NET class 之间来回。有关此
的示例,请参阅 AvroGen 工具的自述文件部分
1.我不知道有替代的 .NET 库
以下是我使用@cricket_007的建议解决问题的步骤。
- 为避免编写 avro 模式的复杂性,请先创建 c# classes,然后使用 AvroSerializer 生成模式。
AvroSerializer.Create().WriterSchema.ToString()
- 这将为 class 生成架构 json。
将其移至模式文件并
- 使所有类型都具有必需的空值
- 然后使用avro_gen.exe工具重新生成class个实现了ISpecific Record的文件。
添加使用以下代码发布到队列
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
using (var producer = new Producer<string, MYClass>(producerConfig,
serdeProvider.GetSerializerGenerator<string>(),
serdeProvider.GetSerializerGenerator<MYClass>()))
{
Console.WriteLine($"{producer.Name} producing on
{_appSettings.PullListKafka.Topic}.");
producer.ProduceAsync(_appSettings.PullListKafka.Topic, new
Message<string, MYClass> { Key = Guid.NewGuid().ToString(), Value = MYClassObject})
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
一些链接可以帮助做到这一点。
https://shanidgafur.github.io/blog/apache-avro-on-dotnet
https://github.com/SidShetye/HelloAvro/tree/master/Avro
我正在尝试寻找 way/helper 到 convert.Net Class 到 Avro.Generic.GenericRecord 。目前,我正在手动将字段名和字段值添加到通用记录中。是否有 serializer/converter 可用于将对象转换为通用记录并发布到 kafka 主题。
class Plant
{
public long Id { get; set; }
public string Name { get; set; }
public List<PlantProperties> PlantProperties{ get; set; }
}
class PlantProperties
{
public long Leaves{ get; set; }
public string Color{ get; set; }
}
求推荐。
假设您使用的是 Confluent Schema Regsitry,您可以使用他们的 .NET 客户端1
https://github.com/confluentinc/confluent-kafka-dotnet
从示例文件夹中复制
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 0;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");
producer
.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
}
}
cts.Cancel();
}
根据您的情况,相应地更新 record.Add
使用
However,既然你有一个class,因此,你应该尝试使用SpecificRecord,而不是序列化回来通过 GenericRecord 在 Avro 和 .NET class 之间来回。有关此
的示例,请参阅 AvroGen 工具的自述文件部分1.我不知道有替代的 .NET 库
以下是我使用@cricket_007的建议解决问题的步骤。
- 为避免编写 avro 模式的复杂性,请先创建 c# classes,然后使用 AvroSerializer 生成模式。
AvroSerializer.Create().WriterSchema.ToString()
- 这将为 class 生成架构 json。 将其移至模式文件并
- 使所有类型都具有必需的空值
- 然后使用avro_gen.exe工具重新生成class个实现了ISpecific Record的文件。
添加使用以下代码发布到队列
using (var serdeProvider = new AvroSerdeProvider(avroConfig)) using (var producer = new Producer<string, MYClass>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<MYClass>())) { Console.WriteLine($"{producer.Name} producing on {_appSettings.PullListKafka.Topic}."); producer.ProduceAsync(_appSettings.PullListKafka.Topic, new Message<string, MYClass> { Key = Guid.NewGuid().ToString(), Value = MYClassObject}) .ContinueWith(task => task.IsFaulted ? $"error producing message: {task.Exception.Message}" : $"produced to: {task.Result.TopicPartitionOffset}"); }
一些链接可以帮助做到这一点。
https://shanidgafur.github.io/blog/apache-avro-on-dotnet https://github.com/SidShetye/HelloAvro/tree/master/Avro