Send/produce json 通过kafka消息

Send/produce json message through kafka

这是我第一次使用 Kafka,我打算将 kafka 与 .net 一起使用

我想知道我是否可以在制作事件时将 JSON 作为消息发送

我正在学习教程:https://developer.confluent.io/get-started/dotnet/#build-producer

另外,有没有办法将该值映射到模型,以便 value/json 结构始终绑定到该模型

例如:如果我希望我的 json 值为

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

我能找到的大多数例子是这样的:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

我希望该项目是 json 结构中的 class。

wanted to know if I can send JSON as a message when I producing an event

是的。 Kafka 存储字节并使用序列化程序转换字节。构建生产者时,您可以选择调用 SetValueSerializer.

一些内置的序列化程序可以在 - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

找到

您需要自己编写以通用处理任何 JSON 模型类型。

将 Utf8Serializer 用于字符串时,您需要预先序列化模型中的对象 class,然后将其作为值发送。在您的示例中,您将 var item 替换为一些序列化对象。

How do I turn a C# object into a JSON string in .NET?

使用模型 classes 时,您的数据通常是强类型的,直到您开始手动编写 JSON 或使用字典类型。如果您想要外部消息验证,Confluent Schema Registry 就是一个支持 JSONSchema 的示例,confluent-dotnet-kafka 项目中的 JsonSerializer 支持这一点。