将任意 JSON 字符串转换为 Kafka Schema

Converting an arbitrary JSON string to Kafka Schema

我正在成功使用 Kafka Connect,但我有一个 JSON 字符串,我正试图以 common/consistent 方式将其转换为 Kafka 架构。有没有办法将任意 JSON 字符串转换为 Kafka Connect 可以使用的“SourceRecord”。我怀疑有一种简单的方法可以做到这一点,但到目前为止我还没有找到一个 good/working 可以满足我的要求的示例。

现在我可以根据示例成功地转换 JSON 字符串,如下所示。显示的示例很简单,但我希望有一种方法可以对任意复杂的 JSON 字符串执行此操作并创建 SourceRecord?

所需的架构输出(键和值)

(注意:这是在 Kafka Connect 插件“poll()”方法中完成的)

:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;

records.add(new SourceRecord(
                    sourcePartition, sourceOffset, topic, partition,
                    keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) {
    return new Struct(HttpSourceSchemas.KEY_SCHEMA)
                .put(HttpSourceSchemas.ID_FIELD, id);
}

private Struct buildValue(String timestamp, String data) {
    return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
                .put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
                .put(HttpSourceSchemas.DATA_FIELD, data);
}
:
:

我的架构如下所示:

public final class HttpSourceSchemas {

    private HttpSourceSchemas() {
        // Empty
    }

    public static final String ID_FIELD = "id";
    public static final String TIMESTAMP_FIELD = "timestamp";
    public static final String DATA_FIELD = "data";

    public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
        .name("Key Schema")
        .version(1)
        .field(ID_FIELD, Schema.INT64_SCHEMA)
        .build();

    public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
        .field(DATA_FIELD, Schema.STRING_SCHEMA)
        .build();
}
  1. 选择一个 JSON 处理库,例如 jackson (recommended, as this is included by connect-api dependency) or gson
  2. 解析任意字符串,在Java.
  3. 中得到一个JSON对象
  4. 进行深度优先搜索以遍历 JSON 对象的节点。
  5. 构建其 Kafka 模式。

Kafka JsonConverter 中的一些代码应该对您有所帮助。