BigQuery 到 Kafka 主题
BigQuery to Kafka Topic
我有一个加载到 BigQuery 的数据,我需要将 BigQuery 连接到 Kafka 作为每天一次将数据发布到 Kafka 主题的来源。由于没有 BigQuery 源连接器,因此将 BigQuery 数据发布到 Kafka 主题的最佳方式是什么。
有多种选择。我会使用 batch Apache Beam pipeline running on Google Dataflow 来完成这个任务。它有 Java 和 Python SDK。
例如在Java中你可以用BigQueryIO and then write it with KafkaIO读取数据。
像这样:
//read rows from BQ
PCollection<TableRow> weatherData = pipeline.apply(
BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
// some data processing steps
// write pairs to Kafka topic
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
);
我有一个加载到 BigQuery 的数据,我需要将 BigQuery 连接到 Kafka 作为每天一次将数据发布到 Kafka 主题的来源。由于没有 BigQuery 源连接器,因此将 BigQuery 数据发布到 Kafka 主题的最佳方式是什么。
有多种选择。我会使用 batch Apache Beam pipeline running on Google Dataflow 来完成这个任务。它有 Java 和 Python SDK。
例如在Java中你可以用BigQueryIO and then write it with KafkaIO读取数据。
像这样:
//read rows from BQ
PCollection<TableRow> weatherData = pipeline.apply(
BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
// some data processing steps
// write pairs to Kafka topic
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
);