将 PubSub 流保存到 GCS 中的分区镶木地板文件
Save PubSub stream to a partitioned parquet file in GCS
我有一个 spark-streaming 应用程序,它从 pubsub 主题(例如 kafka)读取消息,对它们中的每一个应用一些转换,并将它们保存为 GCS 中的镶木地板文件,按任意列分区。使用结构化流和 spark-gcs 连接器相对容易做到这一点。
例如,每条消息如下所示:
{
"app_id": "app1",
"user_id": "u001",
"evt_timestamp": 1617105047,
"evt_data": { ... }
}
我将其作为结构化流式数据帧读取,然后将其分区,例如app_id
和 user_id
,然后将其保存到 GCS 存储桶中,然后看起来像这样:
gs://my-bucket/data/app_id=app1/user_id=u001/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u002/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u003/XXX.part
gs://my-bucket/data/app_id=app2/user_id=u001/XXX.part
...
我想将我的数据处理转移到 GCP,这样我就不必管理我的 Spark 基础设施。我可以重写我的应用程序以在 Dataproc 上使用 DStreams 和 运行 它,但重要的人不愿意使用 Spark。
我一直没能找到一种方法来分区我的数据。 BigQuery支持集群,这似乎是我所需要的,但我仍然需要不断地将它保存到GCS。是否可以在 GCP 中轻松完成,或者我的用例是否以某种方式损坏?
编辑:
正如已接受的答案所建议的那样,我设法使用 writeDynamic
和我对 FileIO.Write.FileNaming
的实现实现了这一点。
大致是这样的:
PCollection<String> pubsubMessages = ... // read json string messages from pubsub
PCollection<ParsedMessage> messages = pubsubMessages
.apply(ParDo.of(new ParseMessage())) // convert json pubsub message to a java bean
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(2))));
FileIO.Write<Partition, JsonMessage> writer = FileIO.<Partition, JsonMessage>writeDynamic()
.by(jsonMessage -> new Partition(/* some jsonMessage fields */))
.via(
Contextful.fn(JsonMessage::toRecord), // convert message to Sink type, in this case GenericRecord
ParquetIO.sink(OUT_SCHEMA)) // create a parquet sink
.withNaming(part -> new PartitionFileName(/* file name based on `part` fields */))
.withDestinationCoder(AvroCoder.of(Partition.class, Partition.SCHEMA))
.withNumShards(1)
.to("output");
PartitionFileName 可以如下所示
class PartFileName implements FileIO.Write.FileNaming {
private final String[] partNames;
private final Serializable[] partValues;
public PartFileName(String[] partNames, Serializable[] partValues) {
this.partNames = partNames;
this.partValues = partValues;
}
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
StringBuilder dir = new StringBuilder();
for (int i = 0; i < this.partNames.length; i++) {
dir
.append(partNames[i])
.append("=")
.append(partValues[i])
.append("/");
}
String fileName = String.format("%d_%d_%d.part", shardIndex, numShards, window.maxTimestamp().getMillis());
return String.format("%s/%s", dir.toString(), fileName);
}
}
这导致目录结构类似于
output/date=20200301/app_id=1001/0_1_1617727449999.part
我相信您正在寻找 Apache Beam/Google Cloud Dataflow streaming pipelines 的 Pubsub。
是的,它可以毫不费力地做你想做的事。您可以在流媒体上定义 windows,然后使用 Parquet IO 将其写入 GCS。
虽然不是 Parquet,this example 从 Pubsub 读取并将文本文件写入 GCS。
要实现动态文件名功能,FileIO 的 writeDynamic
和您自己的 FilenamePolicy
应该可以很好地工作。
我有一个 spark-streaming 应用程序,它从 pubsub 主题(例如 kafka)读取消息,对它们中的每一个应用一些转换,并将它们保存为 GCS 中的镶木地板文件,按任意列分区。使用结构化流和 spark-gcs 连接器相对容易做到这一点。 例如,每条消息如下所示:
{
"app_id": "app1",
"user_id": "u001",
"evt_timestamp": 1617105047,
"evt_data": { ... }
}
我将其作为结构化流式数据帧读取,然后将其分区,例如app_id
和 user_id
,然后将其保存到 GCS 存储桶中,然后看起来像这样:
gs://my-bucket/data/app_id=app1/user_id=u001/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u002/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u003/XXX.part
gs://my-bucket/data/app_id=app2/user_id=u001/XXX.part
...
我想将我的数据处理转移到 GCP,这样我就不必管理我的 Spark 基础设施。我可以重写我的应用程序以在 Dataproc 上使用 DStreams 和 运行 它,但重要的人不愿意使用 Spark。 我一直没能找到一种方法来分区我的数据。 BigQuery支持集群,这似乎是我所需要的,但我仍然需要不断地将它保存到GCS。是否可以在 GCP 中轻松完成,或者我的用例是否以某种方式损坏?
编辑:
正如已接受的答案所建议的那样,我设法使用 writeDynamic
和我对 FileIO.Write.FileNaming
的实现实现了这一点。
大致是这样的:
PCollection<String> pubsubMessages = ... // read json string messages from pubsub
PCollection<ParsedMessage> messages = pubsubMessages
.apply(ParDo.of(new ParseMessage())) // convert json pubsub message to a java bean
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(2))));
FileIO.Write<Partition, JsonMessage> writer = FileIO.<Partition, JsonMessage>writeDynamic()
.by(jsonMessage -> new Partition(/* some jsonMessage fields */))
.via(
Contextful.fn(JsonMessage::toRecord), // convert message to Sink type, in this case GenericRecord
ParquetIO.sink(OUT_SCHEMA)) // create a parquet sink
.withNaming(part -> new PartitionFileName(/* file name based on `part` fields */))
.withDestinationCoder(AvroCoder.of(Partition.class, Partition.SCHEMA))
.withNumShards(1)
.to("output");
PartitionFileName 可以如下所示
class PartFileName implements FileIO.Write.FileNaming {
private final String[] partNames;
private final Serializable[] partValues;
public PartFileName(String[] partNames, Serializable[] partValues) {
this.partNames = partNames;
this.partValues = partValues;
}
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
StringBuilder dir = new StringBuilder();
for (int i = 0; i < this.partNames.length; i++) {
dir
.append(partNames[i])
.append("=")
.append(partValues[i])
.append("/");
}
String fileName = String.format("%d_%d_%d.part", shardIndex, numShards, window.maxTimestamp().getMillis());
return String.format("%s/%s", dir.toString(), fileName);
}
}
这导致目录结构类似于
output/date=20200301/app_id=1001/0_1_1617727449999.part
我相信您正在寻找 Apache Beam/Google Cloud Dataflow streaming pipelines 的 Pubsub。
是的,它可以毫不费力地做你想做的事。您可以在流媒体上定义 windows,然后使用 Parquet IO 将其写入 GCS。
虽然不是 Parquet,this example 从 Pubsub 读取并将文本文件写入 GCS。
要实现动态文件名功能,FileIO 的 writeDynamic
和您自己的 FilenamePolicy
应该可以很好地工作。