使用 Apache Beam 仅从 Kafka 主题中获取字段的子集
Get only a subset of fields from a Kafka topic using Apache Beam
有没有办法只读取 Kafka 主题的特定字段?
我有一个主题,比如 person
和架构 personSchema
。该架构包含许多字段,例如 id
、name
、address
、contact
、dateOfBirth
.
我只想得到 id
、name
和 address
。我该怎么做?
目前我正在使用 Apache Beam 读取流,并打算之后将数据写入 BigQuery。我正在尝试使用 Filter
但由于 Boolean return type
而无法使其工作
这是我的代码:
Pipeline pipeline = Pipeline.create();
PCollection<KV<String, Person>> kafkaStreams =
pipeline
.apply("read streams", dataIO.readStreams(topic))
.apply(Filter.by(new SerializableFunction<KV<String, Person>, Boolean>() {
@Override
public Boolean apply(KV<String, Order> input) {
return input.getValue().get("address").equals(true);
}
}));
其中 dataIO.readStreams
是 return 这个:
return KafkaIO.<String, Person>read()
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(PersonAvroDeserializer.class)
.withConsumerConfigUpdates(consumer)
.withoutMetadata();
对于可能的解决方案,我将不胜感激。
您可以使用 ksqlDB, which also work directly with Kafka Connect for which there is a sink connector for BigQuery
CREATE STREAM MY_SOURCE WITH (KAFKA_TOPIC='person', VALUE_FORMAT=AVRO');
CREATE STREAM FILTERED_STREAM AS SELECT id, name, address FROM MY_SOURCE;
CREATE SINK CONNECTOR SINK_BQ_01 WITH (
'connector.class' = 'com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
'topics' = 'FILTERED_STREAM',
…
);
您也可以自己创建一个新的 TableSchema
,只包含必填字段。稍后当您写入 BigQuery 时,您可以将新创建的架构作为参数而不是旧架构传递。
TableSchema schema = new TableSchema();
List<TableFieldSchema> tableFields = new ArrayList<TableFieldSchema>();
TableFieldSchema id =
new TableFieldSchema()
.setName("id")
.setType("STRING")
.setMode("NULLABLE");
tableFields.add(id);
schema.setFields(tableFields);
return schema;
我还应该提到,如果您在某个时候将 AVRO 记录转换为 BigQuery 的 TableRow
,您可能也需要在那里实施一些检查。
有没有办法只读取 Kafka 主题的特定字段?
我有一个主题,比如 person
和架构 personSchema
。该架构包含许多字段,例如 id
、name
、address
、contact
、dateOfBirth
.
我只想得到 id
、name
和 address
。我该怎么做?
目前我正在使用 Apache Beam 读取流,并打算之后将数据写入 BigQuery。我正在尝试使用 Filter
但由于 Boolean return type
这是我的代码:
Pipeline pipeline = Pipeline.create();
PCollection<KV<String, Person>> kafkaStreams =
pipeline
.apply("read streams", dataIO.readStreams(topic))
.apply(Filter.by(new SerializableFunction<KV<String, Person>, Boolean>() {
@Override
public Boolean apply(KV<String, Order> input) {
return input.getValue().get("address").equals(true);
}
}));
其中 dataIO.readStreams
是 return 这个:
return KafkaIO.<String, Person>read()
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(PersonAvroDeserializer.class)
.withConsumerConfigUpdates(consumer)
.withoutMetadata();
对于可能的解决方案,我将不胜感激。
您可以使用 ksqlDB, which also work directly with Kafka Connect for which there is a sink connector for BigQuery
CREATE STREAM MY_SOURCE WITH (KAFKA_TOPIC='person', VALUE_FORMAT=AVRO');
CREATE STREAM FILTERED_STREAM AS SELECT id, name, address FROM MY_SOURCE;
CREATE SINK CONNECTOR SINK_BQ_01 WITH (
'connector.class' = 'com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
'topics' = 'FILTERED_STREAM',
…
);
您也可以自己创建一个新的 TableSchema
,只包含必填字段。稍后当您写入 BigQuery 时,您可以将新创建的架构作为参数而不是旧架构传递。
TableSchema schema = new TableSchema();
List<TableFieldSchema> tableFields = new ArrayList<TableFieldSchema>();
TableFieldSchema id =
new TableFieldSchema()
.setName("id")
.setType("STRING")
.setMode("NULLABLE");
tableFields.add(id);
schema.setFields(tableFields);
return schema;
我还应该提到,如果您在某个时候将 AVRO 记录转换为 BigQuery 的 TableRow
,您可能也需要在那里实施一些检查。