将任意 JSON 字符串转换为 Kafka Schema
Converting an arbitrary JSON string to Kafka Schema
我正在成功使用 Kafka Connect,但我有一个 JSON 字符串,我正试图以 common/consistent 方式将其转换为 Kafka 架构。有没有办法将任意 JSON 字符串转换为 Kafka Connect 可以使用的“SourceRecord”。我怀疑有一种简单的方法可以做到这一点,但到目前为止我还没有找到一个 good/working 可以满足我的要求的示例。
现在我可以根据示例成功地转换 JSON 字符串,如下所示。显示的示例很简单,但我希望有一种方法可以对任意复杂的 JSON 字符串执行此操作并创建 SourceRecord?
所需的架构输出(键和值)
(注意:这是在 Kafka Connect 插件“poll()”方法中完成的)
:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;
records.add(new SourceRecord(
sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) {
return new Struct(HttpSourceSchemas.KEY_SCHEMA)
.put(HttpSourceSchemas.ID_FIELD, id);
}
private Struct buildValue(String timestamp, String data) {
return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
.put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
.put(HttpSourceSchemas.DATA_FIELD, data);
}
:
:
我的架构如下所示:
public final class HttpSourceSchemas {
private HttpSourceSchemas() {
// Empty
}
public static final String ID_FIELD = "id";
public static final String TIMESTAMP_FIELD = "timestamp";
public static final String DATA_FIELD = "data";
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("Key Schema")
.version(1)
.field(ID_FIELD, Schema.INT64_SCHEMA)
.build();
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name("Value Schema")
.version(1)
.field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
.field(DATA_FIELD, Schema.STRING_SCHEMA)
.build();
}
- 选择一个 JSON 处理库,例如 jackson (recommended, as this is included by
connect-api
dependency) or gson
- 解析任意字符串,在Java.
中得到一个JSON对象
- 进行深度优先搜索以遍历 JSON 对象的节点。
- 构建其 Kafka 模式。
Kafka JsonConverter 中的一些代码应该对您有所帮助。
我正在成功使用 Kafka Connect,但我有一个 JSON 字符串,我正试图以 common/consistent 方式将其转换为 Kafka 架构。有没有办法将任意 JSON 字符串转换为 Kafka Connect 可以使用的“SourceRecord”。我怀疑有一种简单的方法可以做到这一点,但到目前为止我还没有找到一个 good/working 可以满足我的要求的示例。
现在我可以根据示例成功地转换 JSON 字符串,如下所示。显示的示例很简单,但我希望有一种方法可以对任意复杂的 JSON 字符串执行此操作并创建 SourceRecord?
所需的架构输出(键和值)(注意:这是在 Kafka Connect 插件“poll()”方法中完成的)
:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;
records.add(new SourceRecord(
sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) {
return new Struct(HttpSourceSchemas.KEY_SCHEMA)
.put(HttpSourceSchemas.ID_FIELD, id);
}
private Struct buildValue(String timestamp, String data) {
return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
.put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
.put(HttpSourceSchemas.DATA_FIELD, data);
}
:
:
我的架构如下所示:
public final class HttpSourceSchemas {
private HttpSourceSchemas() {
// Empty
}
public static final String ID_FIELD = "id";
public static final String TIMESTAMP_FIELD = "timestamp";
public static final String DATA_FIELD = "data";
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("Key Schema")
.version(1)
.field(ID_FIELD, Schema.INT64_SCHEMA)
.build();
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name("Value Schema")
.version(1)
.field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
.field(DATA_FIELD, Schema.STRING_SCHEMA)
.build();
}
- 选择一个 JSON 处理库,例如 jackson (recommended, as this is included by
connect-api
dependency) or gson - 解析任意字符串,在Java. 中得到一个JSON对象
- 进行深度优先搜索以遍历 JSON 对象的节点。
- 构建其 Kafka 模式。
Kafka JsonConverter 中的一些代码应该对您有所帮助。