使用 Apache Beam 仅从 Kafka 主题中获取字段的子集

Get only a subset of fields from a Kafka topic using Apache Beam

有没有办法只读取 Kafka 主题的特定字段?

我有一个主题,比如 person 和架构 personSchema。该架构包含许多字段,例如 idnameaddresscontactdateOfBirth.

我只想得到 idnameaddress。我该怎么做?

目前我正在使用 Apache Beam 读取流,并打算之后将数据写入 BigQuery。我正在尝试使用 Filter 但由于 Boolean return type

而无法使其工作

这是我的代码:

Pipeline pipeline = Pipeline.create();
PCollection<KV<String, Person>> kafkaStreams =
                pipeline
               .apply("read streams", dataIO.readStreams(topic))
               .apply(Filter.by(new SerializableFunction<KV<String, Person>, Boolean>() {
                  @Override
                  public Boolean apply(KV<String, Order> input) {
                     return input.getValue().get("address").equals(true);
                  }
                  }));

其中 dataIO.readStreams 是 return 这个:

return KafkaIO.<String, Person>read()
                .withTopic(topic)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(PersonAvroDeserializer.class)
                .withConsumerConfigUpdates(consumer)
                .withoutMetadata();

对于可能的解决方案,我将不胜感激。

您可以使用 ksqlDB, which also work directly with Kafka Connect for which there is a sink connector for BigQuery

CREATE STREAM MY_SOURCE WITH (KAFKA_TOPIC='person', VALUE_FORMAT=AVRO');

CREATE STREAM FILTERED_STREAM AS SELECT id, name, address FROM MY_SOURCE;

CREATE SINK CONNECTOR SINK_BQ_01 WITH (
  'connector.class' = 'com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
  'topics' = 'FILTERED_STREAM',
…
);

您也可以自己创建一个新的 TableSchema,只包含必填字段。稍后当您写入 BigQuery 时,您可以将新创建​​的架构作为参数而不是旧架构传递。

        TableSchema schema = new TableSchema();
        List<TableFieldSchema> tableFields = new ArrayList<TableFieldSchema>();

        TableFieldSchema id =
                new TableFieldSchema()
                        .setName("id")
                        .setType("STRING")
                        .setMode("NULLABLE");

        tableFields.add(id);
       
        schema.setFields(tableFields);
        return schema;

我还应该提到,如果您在某个时候将 AVRO 记录转换为 BigQuery 的 TableRow,您可能也需要在那里实施一些检查。