将 kafka-streams 与自定义分区程序一起使用
Using kafka-streams with custom partitioner
我想加入一个带有 KTable 的 KStream。两者都有不同的密钥,但使用自定义分区程序进行了共同分区。但是,连接不会产生结果。
KStream 的结构如下
- 关键:房子 - 组
- 值:用户
KTable 的结构如下
- 键:用户 - 组
- 值:地址
为了确保每个插入都按插入顺序处理两个主题,我使用了自定义分区程序,我在其中使用每个键的组部分对两个主题进行分区。
我想以以下结构的流结束:
- 关键:房子 - 组
- 值:用户 - 地址
为此,我正在执行以下操作:
val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
.map { k: HouseGroup, v: User ->
val newKey = UserGroup(v, k.group)
val newVal = UserHouse(v, k.house)
KeyValue(newKey, newVal)
}
.join(userToAddress) { v1: UserHouse, v2: Address ->
UserHouseWithAddress(v1, v2)
}
.map{k: UserGroup, v: UserHouseWithAddress ->
val newKey = HouseGroup(v.house, k.group)
val newVal = UserWithAddress(k.user, v.address)
KeyValue(newKey, newVal)
}
这需要一个匹配的连接,但没有成功。
我想显而易见的解决方案是加入全局 table 并放弃自定义分区程序。但是,我仍然不明白为什么上面的方法不起作用。
我认为缺少匹配是因为使用了不同个分区器。
对于您输入的主题,使用了 CustomPartitioner
。 Kafka Streams 默认使用 org.apache.kafka.clients.producer.internals.DefaultPartitioner
。
在您调用 KStream::join
之前的代码中,您调用了 KStream::map
。 KStream::map
函数在 KStream::join
之前强制重新分区。在重新分区期间,消息被刷新到 Kafka($AppName-KSTREAM-MAP-000000000X-repartition
主题)。为了传播消息,Kafka Streams 使用定义的分区器 (属性: ProducerConfig.PARTITIONER_CLASS_CONFIG
)。总结:对于“repartition topic”和“KTable topic”
,具有相同键的消息可能位于不同的分区中
您的解决方案将在您的 Kafka Streams 应用程序的属性中设置 您的自定义 分区 (props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner"
)
对于调试,您可以检查重新分区主题 ($AppName-KSTREAM-MAP-000000000X-repartition
)。具有相同键(如输入主题)的消息可能位于不同分区(不同数量)
的文档
试试这个,对我有用。
static async System.Threading.Tasks.Task Main(string[] args)
{
int count = 0;
string line = null;
var appConfig = getAppConfig(Enviroment.Dev);
var schemaRegistrConfig = getSchemmaRegistryConfig(appConfig);
var registry = new CachedSchemaRegistryClient(schemaRegistrConfig);
var serializer = new AvroSerializer<YourAvroSchemaClass>(registry);
var adminClient = new AdminClientBuilder(new AdminClientConfig( getClientConfig(appConfig))).Build();
var topics = new List<TopicSpecification>(){ new TopicSpecification { Name = appConfig.OutputTopic, NumPartitions = 11}};
await adminClient.CreateTopicsAsync(topics);
var producerConfig = getProducerConfig(appConfig);
var producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetPartitioner(appConfig.OutputTopic, (string topicName, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull) =>
{
var keyValueInInt = Convert.ToInt32(System.Text.UTF8Encoding.UTF8.GetString(keyData.ToArray()));
return (Partition)Math.Floor((double)(keyValueInInt % partitionCount));
}).Build();
using (producer)
{
Console.WriteLine($"Start to load data from : {appConfig.DataFileName}: { DateTime.Now} ");
var watch = new Stopwatch();
watch.Start();
try
{
var stream = new StreamReader(appConfig.DataFileName);
while ((line = stream.ReadLine()) != null)
{
var message = parseLine(line);
var data = await serializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, appConfig.OutputTopic));
producer.Produce(appConfig.OutputTopic, new Message<string, byte[]> { Key = message.Key, Value = data });
if (count++ % 1000 == 0)
{
producer.Flush();
Console.WriteLine($"Write ... {count} in {watch.Elapsed.TotalSeconds} seconds");
}
}
producer.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Line: {line}");
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
System.Environment.Exit(101);
}
finally
{
producer.Flush();
}
}
}
我想加入一个带有 KTable 的 KStream。两者都有不同的密钥,但使用自定义分区程序进行了共同分区。但是,连接不会产生结果。
KStream 的结构如下
- 关键:房子 - 组
- 值:用户
KTable 的结构如下
- 键:用户 - 组
- 值:地址
为了确保每个插入都按插入顺序处理两个主题,我使用了自定义分区程序,我在其中使用每个键的组部分对两个主题进行分区。
我想以以下结构的流结束:
- 关键:房子 - 组
- 值:用户 - 地址
为此,我正在执行以下操作:
val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
.map { k: HouseGroup, v: User ->
val newKey = UserGroup(v, k.group)
val newVal = UserHouse(v, k.house)
KeyValue(newKey, newVal)
}
.join(userToAddress) { v1: UserHouse, v2: Address ->
UserHouseWithAddress(v1, v2)
}
.map{k: UserGroup, v: UserHouseWithAddress ->
val newKey = HouseGroup(v.house, k.group)
val newVal = UserWithAddress(k.user, v.address)
KeyValue(newKey, newVal)
}
这需要一个匹配的连接,但没有成功。
我想显而易见的解决方案是加入全局 table 并放弃自定义分区程序。但是,我仍然不明白为什么上面的方法不起作用。
我认为缺少匹配是因为使用了不同个分区器。
对于您输入的主题,使用了 CustomPartitioner
。 Kafka Streams 默认使用 org.apache.kafka.clients.producer.internals.DefaultPartitioner
。
在您调用 KStream::join
之前的代码中,您调用了 KStream::map
。 KStream::map
函数在 KStream::join
之前强制重新分区。在重新分区期间,消息被刷新到 Kafka($AppName-KSTREAM-MAP-000000000X-repartition
主题)。为了传播消息,Kafka Streams 使用定义的分区器 (属性: ProducerConfig.PARTITIONER_CLASS_CONFIG
)。总结:对于“repartition topic”和“KTable topic”
您的解决方案将在您的 Kafka Streams 应用程序的属性中设置 您的自定义 分区 (props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner"
)
对于调试,您可以检查重新分区主题 ($AppName-KSTREAM-MAP-000000000X-repartition
)。具有相同键(如输入主题)的消息可能位于不同分区(不同数量)
试试这个,对我有用。
static async System.Threading.Tasks.Task Main(string[] args)
{
int count = 0;
string line = null;
var appConfig = getAppConfig(Enviroment.Dev);
var schemaRegistrConfig = getSchemmaRegistryConfig(appConfig);
var registry = new CachedSchemaRegistryClient(schemaRegistrConfig);
var serializer = new AvroSerializer<YourAvroSchemaClass>(registry);
var adminClient = new AdminClientBuilder(new AdminClientConfig( getClientConfig(appConfig))).Build();
var topics = new List<TopicSpecification>(){ new TopicSpecification { Name = appConfig.OutputTopic, NumPartitions = 11}};
await adminClient.CreateTopicsAsync(topics);
var producerConfig = getProducerConfig(appConfig);
var producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetPartitioner(appConfig.OutputTopic, (string topicName, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull) =>
{
var keyValueInInt = Convert.ToInt32(System.Text.UTF8Encoding.UTF8.GetString(keyData.ToArray()));
return (Partition)Math.Floor((double)(keyValueInInt % partitionCount));
}).Build();
using (producer)
{
Console.WriteLine($"Start to load data from : {appConfig.DataFileName}: { DateTime.Now} ");
var watch = new Stopwatch();
watch.Start();
try
{
var stream = new StreamReader(appConfig.DataFileName);
while ((line = stream.ReadLine()) != null)
{
var message = parseLine(line);
var data = await serializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, appConfig.OutputTopic));
producer.Produce(appConfig.OutputTopic, new Message<string, byte[]> { Key = message.Key, Value = data });
if (count++ % 1000 == 0)
{
producer.Flush();
Console.WriteLine($"Write ... {count} in {watch.Elapsed.TotalSeconds} seconds");
}
}
producer.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Line: {line}");
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
System.Environment.Exit(101);
}
finally
{
producer.Flush();
}
}
}