从本地磁盘将数据加载到 BigQuery 和 Google 云存储的策略
Strategy for loading data into BigQuery and Google cloud Storage from local disk
我从 teradata 中提取的本地磁盘中有 2 年的合并数据,大小约为 300GB。我必须将相同的数据加载到 google 云存储和 BigQuery table。
google 云存储中的最终数据应以压缩格式按天分隔(每天的文件应为 gz 格式的单个文件)。
我还必须按天分区 table 将数据加载到 BigQuery 中,即每天的数据应存储在一个分区中。
我先将2年的合并数据加载到google存储中。然后尝试使用 google 数据流通过使用数据流中的分区概念来逐日分离数据并将其加载到 google 云存储(仅供参考数据流分区与 bigquery 分区不同)。但是数据流不允许创建 730 个分区(2 年),因为它遇到 413 Request Entity Too Large(管道的序列化 JSON 表示的大小超过允许的限制”)。
所以我 运行 数据流作业两次,每年过滤数据。
它过滤了每年的数据并将其写入 google 云存储中的单独文件,但由于数据流目前无法写入压缩文件,因此无法压缩它。
看到第一种方法失败,我想到使用数据流中的分区从组合数据中过滤一年的数据,如上文所述,并将其直接写入 BigQuery,然后将其导出到 google 压缩存储格式。这个过程会重复两次。
但是在这种方法中,我无法一次写入超过 45 天的数据,因为我反复点击 java.lang.OutOfMemoryError: Java heap space issue。所以这个攻略也失败了
对于确定以日期方式隔离迁移到压缩格式的 google 存储和 BigQuery 的策略有什么帮助吗?
让我们看看这是否有帮助?
步骤+伪代码
1 - 将组合数据 (300GB) 上传到 BigQuery 到 CombinedData table
2 - 拆分年(成本 1x2x300GB = 600GB)
SELECT * FROM CombinedData WHERE year = year1 -> write to DataY1 table
SELECT * FROM CombinedData WHERE year = year2 -> write to DataY2 table
3 - 分成 6 个月(成本 2x2x150GB = 600GB)
SELECT * FROM DataY1 WHERE month in (1,2,3,4,5,6) -> write to DataY1H1 table
SELECT * FROM DataY1 WHERE month in (7,8,9,10,11,12) -> write to DataY1H2 table
SELECT * FROM DataY2 WHERE month in (1,2,3,4,5,6) -> write to DataY2H1 table
SELECT * FROM DataY2 WHERE month in (7,8,9,10,11,12) -> write to DataY2H2 table
4 - 分成 3 个月(成本 4x2x75GB = 600GB)
SELECT * FROM DataY1H1 WHERE month in (1,2,3) -> write to DataY1Q1 table
SELECT * FROM DataY1H1 WHERE month in (4,5,6) -> write to DataY1Q2 table
SELECT * FROM DataY1H2 WHERE month in (7,8,9) -> write to DataY1Q3 table
SELECT * FROM DataY1H2 WHERE month in (10,11,12) -> write to DataY1Q4 table
SELECT * FROM DataY2H1 WHERE month in (1,2,3) -> write to DataY2Q1 table
SELECT * FROM DataY2H1 WHERE month in (4,5,6) -> write to DataY2Q2 table
SELECT * FROM DataY2H2 WHERE month in (7,8,9) -> write to DataY2Q3 table
SELECT * FROM DataY2H2 WHERE month in (10,11,12) -> write to DataY2Q4 table
5 - 将每个季度分成 1 个月和 2 个月(成本 8x2x37.5GB = 600GB)
SELECT * FROM DataY1Q1 WHERE month = 1 -> write to DataY1M01 table
SELECT * FROM DataY1Q1 WHERE month in (2,3) -> write to DataY1M02-03 table
SELECT * FROM DataY1Q2 WHERE month = 4 -> write to DataY1M04 table
SELECT * FROM DataY1Q2 WHERE month in (5,6) -> write to DataY1M05-06 table
其余 Y(1/2)Q(1-4) 相同 tables
6 - 将所有双月 table 分成单独的月 table(成本 8x2x25GB = 400GB)
SELECT * FROM DataY1M002-03 WHERE month = 2 -> write to DataY1M02 table
SELECT * FROM DataY1M002-03 WHERE month = 3 -> write to DataY1M03 table
SELECT * FROM DataY1M005-06 WHERE month = 5 -> write to DataY1M05 table
SELECT * FROM DataY1M005-06 WHERE month = 6 -> write to DataY1M06 table
其余 Y(1/2)M(XX-YY) 相同 tables
7 - 最后你有 24 个每月 tables,现在我希望你面临的限制会消失,这样你就可以继续你的计划 - 比如说第二种方法 - 每天进一步拆分 tables
我认为,从成本角度来看,这是最佳方法,最终查询成本为
(假设计费等级 1)
4x600GB + 400GB = 2800GB =
当然别忘了删除中间的tables
注意:我对这个计划不满意 - 但如果将原始文件拆分为 BigQuery 之外的日常块不是一个选项 - 这可能会有所帮助
目前,分割结果是产生多个输出的最佳方式files/tables。您可能 运行 的事实是每次写入都会为上传分配一个缓冲区,因此如果您有一个分区后跟 N 次写入,则有 N 个缓冲区。
实现这项工作有两种策略。
- 您可以使用 GcsOptions 中的
uploadBufferSizeBytes
选项减小上传缓冲区的大小。请注意,这可能会减慢上传速度,因为需要更频繁地刷新缓冲区。
- 您可以对分区后的每个
PCollection
应用一个 Reshuffle
操作。这将同时限制并发 BigQuery 接收器的数量 运行,因此分配的缓冲区会更少。
例如,您可以这样做:
PCollection<Data> allData = ...;
PCollectionList<Data> partitions = allData.apply(Partition.of(...));
// Assuming the partitioning function has produced numDays partitions,
// and those can be mapped back to the day in some meaningful way:
for (int i = 0; i < numDays; i++) {
String outputName = nameFor(i); // compute the output name
partitions.get(i)
.apply("Write_" + outputName, ReshuffleAndWrite(outputName));
}
这利用了这两个辅助 PTransforms:
private static class Reshuffle<T>
extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
.apply("Random Key", WithKeys.of(
new SerializableFunction<T, Integer>() {
@Override
public Integer apply(Data value) {
return ThreadLocalRandom.current().nextInt();
}
}))
.apply("Shuffle", GroupByKey.<Integer, T>create())
.apply("Remove Key", Values.create());
}
}
private static class ReshuffleAndWrite
extends PTransform<PCollection<Data>, PDone> {
private final String outputName;
public ReshuffleAndWrite(String outputName) {
this.outputName = outputName;
}
@Override
public PDone apply(PCollection<Data> in) {
return in
.apply("Reshuffle", new Reshuffle<Data>())
.apply("Write", BigQueryIO.Write.to(tableNameFor(outputName)
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
}
}
我从 teradata 中提取的本地磁盘中有 2 年的合并数据,大小约为 300GB。我必须将相同的数据加载到 google 云存储和 BigQuery table。
google 云存储中的最终数据应以压缩格式按天分隔(每天的文件应为 gz 格式的单个文件)。 我还必须按天分区 table 将数据加载到 BigQuery 中,即每天的数据应存储在一个分区中。
我先将2年的合并数据加载到google存储中。然后尝试使用 google 数据流通过使用数据流中的分区概念来逐日分离数据并将其加载到 google 云存储(仅供参考数据流分区与 bigquery 分区不同)。但是数据流不允许创建 730 个分区(2 年),因为它遇到 413 Request Entity Too Large(管道的序列化 JSON 表示的大小超过允许的限制”)。
所以我 运行 数据流作业两次,每年过滤数据。 它过滤了每年的数据并将其写入 google 云存储中的单独文件,但由于数据流目前无法写入压缩文件,因此无法压缩它。
看到第一种方法失败,我想到使用数据流中的分区从组合数据中过滤一年的数据,如上文所述,并将其直接写入 BigQuery,然后将其导出到 google 压缩存储格式。这个过程会重复两次。 但是在这种方法中,我无法一次写入超过 45 天的数据,因为我反复点击 java.lang.OutOfMemoryError: Java heap space issue。所以这个攻略也失败了
对于确定以日期方式隔离迁移到压缩格式的 google 存储和 BigQuery 的策略有什么帮助吗?
让我们看看这是否有帮助?
步骤+伪代码
1 - 将组合数据 (300GB) 上传到 BigQuery 到 CombinedData table
2 - 拆分年(成本 1x2x300GB = 600GB)
SELECT * FROM CombinedData WHERE year = year1 -> write to DataY1 table
SELECT * FROM CombinedData WHERE year = year2 -> write to DataY2 table
3 - 分成 6 个月(成本 2x2x150GB = 600GB)
SELECT * FROM DataY1 WHERE month in (1,2,3,4,5,6) -> write to DataY1H1 table
SELECT * FROM DataY1 WHERE month in (7,8,9,10,11,12) -> write to DataY1H2 table
SELECT * FROM DataY2 WHERE month in (1,2,3,4,5,6) -> write to DataY2H1 table
SELECT * FROM DataY2 WHERE month in (7,8,9,10,11,12) -> write to DataY2H2 table
4 - 分成 3 个月(成本 4x2x75GB = 600GB)
SELECT * FROM DataY1H1 WHERE month in (1,2,3) -> write to DataY1Q1 table
SELECT * FROM DataY1H1 WHERE month in (4,5,6) -> write to DataY1Q2 table
SELECT * FROM DataY1H2 WHERE month in (7,8,9) -> write to DataY1Q3 table
SELECT * FROM DataY1H2 WHERE month in (10,11,12) -> write to DataY1Q4 table
SELECT * FROM DataY2H1 WHERE month in (1,2,3) -> write to DataY2Q1 table
SELECT * FROM DataY2H1 WHERE month in (4,5,6) -> write to DataY2Q2 table
SELECT * FROM DataY2H2 WHERE month in (7,8,9) -> write to DataY2Q3 table
SELECT * FROM DataY2H2 WHERE month in (10,11,12) -> write to DataY2Q4 table
5 - 将每个季度分成 1 个月和 2 个月(成本 8x2x37.5GB = 600GB)
SELECT * FROM DataY1Q1 WHERE month = 1 -> write to DataY1M01 table
SELECT * FROM DataY1Q1 WHERE month in (2,3) -> write to DataY1M02-03 table
SELECT * FROM DataY1Q2 WHERE month = 4 -> write to DataY1M04 table
SELECT * FROM DataY1Q2 WHERE month in (5,6) -> write to DataY1M05-06 table
其余 Y(1/2)Q(1-4) 相同 tables
6 - 将所有双月 table 分成单独的月 table(成本 8x2x25GB = 400GB)
SELECT * FROM DataY1M002-03 WHERE month = 2 -> write to DataY1M02 table
SELECT * FROM DataY1M002-03 WHERE month = 3 -> write to DataY1M03 table
SELECT * FROM DataY1M005-06 WHERE month = 5 -> write to DataY1M05 table
SELECT * FROM DataY1M005-06 WHERE month = 6 -> write to DataY1M06 table
其余 Y(1/2)M(XX-YY) 相同 tables
7 - 最后你有 24 个每月 tables,现在我希望你面临的限制会消失,这样你就可以继续你的计划 - 比如说第二种方法 - 每天进一步拆分 tables
我认为,从成本角度来看,这是最佳方法,最终查询成本为
(假设计费等级 1)
4x600GB + 400GB = 2800GB =
当然别忘了删除中间的tables
注意:我对这个计划不满意 - 但如果将原始文件拆分为 BigQuery 之外的日常块不是一个选项 - 这可能会有所帮助
目前,分割结果是产生多个输出的最佳方式files/tables。您可能 运行 的事实是每次写入都会为上传分配一个缓冲区,因此如果您有一个分区后跟 N 次写入,则有 N 个缓冲区。
实现这项工作有两种策略。
- 您可以使用 GcsOptions 中的
uploadBufferSizeBytes
选项减小上传缓冲区的大小。请注意,这可能会减慢上传速度,因为需要更频繁地刷新缓冲区。 - 您可以对分区后的每个
PCollection
应用一个Reshuffle
操作。这将同时限制并发 BigQuery 接收器的数量 运行,因此分配的缓冲区会更少。
例如,您可以这样做:
PCollection<Data> allData = ...;
PCollectionList<Data> partitions = allData.apply(Partition.of(...));
// Assuming the partitioning function has produced numDays partitions,
// and those can be mapped back to the day in some meaningful way:
for (int i = 0; i < numDays; i++) {
String outputName = nameFor(i); // compute the output name
partitions.get(i)
.apply("Write_" + outputName, ReshuffleAndWrite(outputName));
}
这利用了这两个辅助 PTransforms:
private static class Reshuffle<T>
extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
.apply("Random Key", WithKeys.of(
new SerializableFunction<T, Integer>() {
@Override
public Integer apply(Data value) {
return ThreadLocalRandom.current().nextInt();
}
}))
.apply("Shuffle", GroupByKey.<Integer, T>create())
.apply("Remove Key", Values.create());
}
}
private static class ReshuffleAndWrite
extends PTransform<PCollection<Data>, PDone> {
private final String outputName;
public ReshuffleAndWrite(String outputName) {
this.outputName = outputName;
}
@Override
public PDone apply(PCollection<Data> in) {
return in
.apply("Reshuffle", new Reshuffle<Data>())
.apply("Write", BigQueryIO.Write.to(tableNameFor(outputName)
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
}
}