Kafka Connect SMT 添加 Kafka header 字段
Kafka Connect SMT to add Kafka header fields
我需要查找或编写一个 SMT 来向请求添加 header 字段。请求缺少一些类型字段,我想添加它们。
你究竟如何在 SMT 中添加一个 header 我所看到的只是像下面这样的记录转换但是如果它的 header 我想更改或添加一个字段怎么办?
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
// record.headers.add(Header) but how do I define the header
// or record.headers.add(String, Schema) but I am not sure how to define Schema?
final Map<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(fieldName, getRandomUuid());
return newRecord(record, null, updatedValue);
}
这应该有效
Headers headers = new ConnectHeaders();
headers.add(myKey, myValue, mySchema);
headers.forEach(h -> record.headers().add(h));
ConnectHeaders 信息可在此处找到 - https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/header/Headers.html
我需要查找或编写一个 SMT 来向请求添加 header 字段。请求缺少一些类型字段,我想添加它们。
你究竟如何在 SMT 中添加一个 header 我所看到的只是像下面这样的记录转换但是如果它的 header 我想更改或添加一个字段怎么办?
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
// record.headers.add(Header) but how do I define the header
// or record.headers.add(String, Schema) but I am not sure how to define Schema?
final Map<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(fieldName, getRandomUuid());
return newRecord(record, null, updatedValue);
}
这应该有效
Headers headers = new ConnectHeaders();
headers.add(myKey, myValue, mySchema);
headers.forEach(h -> record.headers().add(h));
ConnectHeaders 信息可在此处找到 - https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/header/Headers.html