如何将 .Net Class 序列化为 Avro.Generic.GenericRecord 以发布到 kafka 主题?

how to Serialize .Net Class to Avro.Generic.GenericRecord for publishing into kafka topic?

我正在尝试寻找 way/helper 到 convert.Net Class 到 Avro.Generic.GenericRecord 。目前,我正在手动将字段名和字段值添加到通用记录中。是否有 serializer/converter 可用于将对象转换为通用记录并发布到 kafka 主题。

class Plant
{
 public long Id { get; set; }
 public string Name { get; set; }
 public List<PlantProperties> PlantProperties{ get; set; }
}
class PlantProperties
{
 public long Leaves{ get; set; }
 public string Color{ get; set; }
}

求推荐。

假设您使用的是 Confluent Schema Regsitry,您可以使用他们的 .NET 客户端1

https://github.com/confluentinc/confluent-kafka-dotnet

从示例文件夹中复制

    using (var serdeProvider = new AvroSerdeProvider(avroConfig))
    using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer
                .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }

    cts.Cancel();
}

根据您的情况,相应地更新 record.Add 使用


However,既然你有一个class,因此,你应该尝试使用SpecificRecord,而不是序列化回来通过 GenericRecord 在 Avro 和 .NET class 之间来回。有关此

的示例,请参阅 AvroGen 工具的自述文件部分

1.我不知道有替代的 .NET 库

以下是我使用@cricket_007的建议解决问题的步骤。

  1. 为避免编写 avro 模式的复杂性,请先创建 c# classes,然后使用 AvroSerializer 生成模式。

AvroSerializer.Create().WriterSchema.ToString()

  1. 这将为 class 生成架构 json。 将其移至模式文件并
  2. 使所有类型都具有必需的空值
  3. 然后使用avro_gen.exe工具重新生成class个实现了ISpecific Record的文件。
  4. 添加使用以下代码发布到队列

    using (var serdeProvider = new AvroSerdeProvider(avroConfig))
            using (var producer = new Producer<string, MYClass>(producerConfig, 
      serdeProvider.GetSerializerGenerator<string>(), 
      serdeProvider.GetSerializerGenerator<MYClass>()))
            {
                Console.WriteLine($"{producer.Name} producing on 
           {_appSettings.PullListKafka.Topic}.");  
    
                producer.ProduceAsync(_appSettings.PullListKafka.Topic, new 
    Message<string, MYClass> { Key = Guid.NewGuid().ToString(), Value = MYClassObject})
                        .ContinueWith(task => task.IsFaulted
                            ? $"error producing message: {task.Exception.Message}"
                            : $"produced to: {task.Result.TopicPartitionOffset}");
    
            }
    

一些链接可以帮助做到这一点。

https://shanidgafur.github.io/blog/apache-avro-on-dotnet https://github.com/SidShetye/HelloAvro/tree/master/Avro