Kafka 连接自定义转换:重写 apply() 方法似乎没有效果,但重写键和值中的 newRecord 方法有效
Kafka Connect Custom Transformation : Override of apply() method seems to have no effect, but overriding newRecord method in key and value works
我正在尝试按照示例中使用的模式创建自定义连接器
我已经重写了 apply 方法来向有效负载添加一个小字符串
public R apply(R record) {
log.info("Transformation apply has started...");
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, newValue);
}
这完全没有效果。输出没有改变。我也没有在生成的日志中的任何地方看到日志信息消息。
但是,如果我覆盖值 class 的 newRecord 方法,我可以看到正在更新的消息。我还看到正在记录信息消息。
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :");
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp());
}
这会导致输出消息按预期更新。请注意,消息中的负载是字符串负载,没有任何特定结构。
我在想这种负载操作也可以发生在 apply() 方法中。此外,我没有看到在转换 运行 中调用此 apply() 方法。负载不受影响,消息也不会被记录。
我是不是漏掉了什么。 apply() 方法是否被调用或使用不正确。任何指导表示赞赏。
注意:覆盖键 class 的 newRecord 方法也有效。
转换的完整源代码如下
package com.xxxxx.yyyyy.kafka.connect.transform;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public abstract class SampleTransform<R extends ConnectRecord<R>> implements Transformation<R> {
private static Logger log = LoggerFactory.getLogger(SampleTransform.class);
@Override
public void configure(Map<String, ?> map) {
}
@Override
public R apply(R record) {
log.info("Transformation apply has started..."); //This does not work
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, "Message is fully changed"); //Output does not change
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Key being changed to : " + updatedValue.toString()); //This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :"); // This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp()); //This works
}
}
}
问题是,您扩展了 org.apache.kafka.connect.transforms.HoistField
而不是 Transformation - SampleTransform
我正在尝试按照示例中使用的模式创建自定义连接器
我已经重写了 apply 方法来向有效负载添加一个小字符串
public R apply(R record) {
log.info("Transformation apply has started...");
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, newValue);
}
这完全没有效果。输出没有改变。我也没有在生成的日志中的任何地方看到日志信息消息。
但是,如果我覆盖值 class 的 newRecord 方法,我可以看到正在更新的消息。我还看到正在记录信息消息。
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :");
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp());
}
这会导致输出消息按预期更新。请注意,消息中的负载是字符串负载,没有任何特定结构。
我在想这种负载操作也可以发生在 apply() 方法中。此外,我没有看到在转换 运行 中调用此 apply() 方法。负载不受影响,消息也不会被记录。
我是不是漏掉了什么。 apply() 方法是否被调用或使用不正确。任何指导表示赞赏。
注意:覆盖键 class 的 newRecord 方法也有效。
转换的完整源代码如下
package com.xxxxx.yyyyy.kafka.connect.transform;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public abstract class SampleTransform<R extends ConnectRecord<R>> implements Transformation<R> {
private static Logger log = LoggerFactory.getLogger(SampleTransform.class);
@Override
public void configure(Map<String, ?> map) {
}
@Override
public R apply(R record) {
log.info("Transformation apply has started..."); //This does not work
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, "Message is fully changed"); //Output does not change
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Key being changed to : " + updatedValue.toString()); //This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :"); // This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp()); //This works
}
}
}
问题是,您扩展了 org.apache.kafka.connect.transforms.HoistField
而不是 Transformation - SampleTransform