设置 STRUCT 类型的默认值时,Kafka Connect API 出错

Kafka Connect API error when set default value of STRUCT type

我想设置一个默认值STRUCT,代码如下:

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("homeAddress")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("personMessage")
                .field("address", schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);

但我得到如下错误

Exception in thread "main" org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
    ... 1 more

我挖掘了 ConnectSchema.validateValue 的代码并找到了抛出异常的原因,

value的schema类型是ConnectSchema,另一个是SchemaBuilder,则抛出异常。

case STRUCT:
    Struct struct = (Struct) value;
    if (!struct.schema().equals(schema))
        throw new DataException("Struct schemas do not match.");
    struct.validate();

等于法

        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);

任何人都可以帮助如何设置类型的默认值 STRUCT

下面是方法的代码'defaultValue':

public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }

如果我将 ConnectSchema.validateValue(this, value) 更改为 ConnectSchema.validateValue(this.builder(), value) 似乎就可以了,我不知道其他情况是否可以。

谢谢。

改变这个 Struct defaultValue = new Struct(schemaBuilder.build())
Struct defaultValue = new Struct(schemaBuilder)

ConnectSchemaSchemaBuilder 都 class 正在执行 Schema schemaBuilder.schemaBuilder.build() 正在将所有值传递给 ConnectSchema 构造函数和 return new Schema 对象并且这个对象是不可变的。

看看这个。

System.out.println(struct); //empty 
System.out.println(struct.get("address"));  //default values
System.out.println(  ((Struct)struct.get("address")).getString("city")   ); //default values
System.out.println(  ((Struct)struct.get("address")).getString("province")   ); //default values

默认值不会重写到结构字段。它们仅存在于架构定义中,如果 filed 为空,则默认值是 return 从架构定义中编辑的。

来自Struct.class

  public Object get(Field field) {
        Object val = this.values[field.index()];
        if (val == null && field.schema().defaultValue() != null) {
            val = field.schema().defaultValue();
        }

        return val;
    }