Send/produce json 通过kafka消息
Send/produce json message through kafka
这是我第一次使用 Kafka,我打算将 kafka 与 .net 一起使用
我想知道我是否可以在制作事件时将 JSON 作为消息发送
我正在学习教程:https://developer.confluent.io/get-started/dotnet/#build-producer
另外,有没有办法将该值映射到模型,以便 value/json 结构始终绑定到该模型
例如:如果我希望我的 json 值为
{
"customerName":"anything",
"eventType":"one-of-three-enums",
"columnsChanged": "string value or something"
}
我能找到的大多数例子是这样的:
using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;
class Producer {
static void Main(string[] args)
{
if (args.Length != 1) {
Console.WriteLine("Please provide the configuration file path as a command line argument");
}
IConfiguration configuration = new ConfigurationBuilder()
.AddIniFile(args[0])
.Build();
const string topic = "purchases";
string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };
using (var producer = new ProducerBuilder<string, string>(
configuration.AsEnumerable()).Build())
{
var numProduced = 0;
const int numMessages = 10;
for (int i = 0; i < numMessages; ++i)
{
Random rnd = new Random();
var user = users[rnd.Next(users.Length)];
var item = items[rnd.Next(items.Length)];
producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError) {
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else {
Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
numProduced += 1;
}
});
}
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
}
}
}
我希望该项目是 json 结构中的 class。
wanted to know if I can send JSON as a message when I producing an event
是的。 Kafka 存储字节并使用序列化程序转换字节。构建生产者时,您可以选择调用 SetValueSerializer
.
一些内置的序列化程序可以在 - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs
找到
您需要自己编写以通用处理任何 JSON 模型类型。
将 Utf8Serializer 用于字符串时,您需要预先序列化模型中的对象 class,然后将其作为值发送。在您的示例中,您将 var item
替换为一些序列化对象。
How do I turn a C# object into a JSON string in .NET?
使用模型 classes 时,您的数据通常是强类型的,直到您开始手动编写 JSON 或使用字典类型。如果您想要外部消息验证,Confluent Schema Registry 就是一个支持 JSONSchema 的示例,confluent-dotnet-kafka
项目中的 JsonSerializer
支持这一点。
这是我第一次使用 Kafka,我打算将 kafka 与 .net 一起使用
我想知道我是否可以在制作事件时将 JSON 作为消息发送
我正在学习教程:https://developer.confluent.io/get-started/dotnet/#build-producer
另外,有没有办法将该值映射到模型,以便 value/json 结构始终绑定到该模型
例如:如果我希望我的 json 值为
{
"customerName":"anything",
"eventType":"one-of-three-enums",
"columnsChanged": "string value or something"
}
我能找到的大多数例子是这样的:
using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;
class Producer {
static void Main(string[] args)
{
if (args.Length != 1) {
Console.WriteLine("Please provide the configuration file path as a command line argument");
}
IConfiguration configuration = new ConfigurationBuilder()
.AddIniFile(args[0])
.Build();
const string topic = "purchases";
string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };
using (var producer = new ProducerBuilder<string, string>(
configuration.AsEnumerable()).Build())
{
var numProduced = 0;
const int numMessages = 10;
for (int i = 0; i < numMessages; ++i)
{
Random rnd = new Random();
var user = users[rnd.Next(users.Length)];
var item = items[rnd.Next(items.Length)];
producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError) {
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else {
Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
numProduced += 1;
}
});
}
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
}
}
}
我希望该项目是 json 结构中的 class。
wanted to know if I can send JSON as a message when I producing an event
是的。 Kafka 存储字节并使用序列化程序转换字节。构建生产者时,您可以选择调用 SetValueSerializer
.
一些内置的序列化程序可以在 - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs
找到您需要自己编写以通用处理任何 JSON 模型类型。
将 Utf8Serializer 用于字符串时,您需要预先序列化模型中的对象 class,然后将其作为值发送。在您的示例中,您将 var item
替换为一些序列化对象。
How do I turn a C# object into a JSON string in .NET?
使用模型 classes 时,您的数据通常是强类型的,直到您开始手动编写 JSON 或使用字典类型。如果您想要外部消息验证,Confluent Schema Registry 就是一个支持 JSONSchema 的示例,confluent-dotnet-kafka
项目中的 JsonSerializer
支持这一点。