在 Spark SQL Thrift Server 中指定缓存数据的分区数

Specifying the number of partitions for cached data in Spark SQL Thrift Server

我在 HDFS 中存储了大约 3000 个 parquet 文件(扩展名为 .gz.parquet),大小从 40 MB 到 160 MB 不等。 HDFS 块大小为 128 MB。所有文件的总大小约为 360 GB。所有文件都在同一目录中,即数据未分区。

我在 16 个节点上启动 Spark SQL Thrift Server 并为每个执行程序指定 160 GB 内存。每个节点有 256 GB 内存和 32 个内核。

我使用 "create external table acdata (....) stored as parquet location '/user/ac/data'"

创建了一个外部 table

然后我使用 "cache table acdata" 缓存数据。这样做会按预期创建大约 4000 个分区。数据在整个集群中占用大约 1200 GB 的内存,并且应该都适合内存,因为我有 1280 GB (16 * 160 GB / 2) 可用于缓存数据。分区范围从 530 MB 到 2 MB。问题是分区没有均匀分布在节点上。一些节点有 50 GB 缓存数据,其他节点有 80 GB 缓存数据。

我 运行 来自 JDBC 客户端的 "Select" 查询。当缓存数据较少的节点处理完它们的本地数据时,它们开始处理缓存在其他节点上的数据,从而导致这些数据通过网络发送。这会导致 select 语句花费更长的时间。

无法对存储在 HDFS 上的数据进行重新分区,因为每天都有数据添加到该目录中,并且每天的数据大小都不同。我必须每天对所有数据重新分区,而不是将数据增量添加到同一目录。

理想情况下,我想找出一种方法将数据均匀分布在所有节点上,以便所有任务花费相同的时间。

如果那不可能,我希望节点只处理本地缓存的数据。我可以通过增加 "spark.locality.wait" 的值来获得一点性能提升,但这会影响所有任务。

让我们将原来的 table 命名为 acdataRaw。如果创建新视图 acdata

CREATE VIEW acdata
AS
SELECT *
FROM acdataRaw
DISTRIBUTE BY <new-key>

那你CACHE TABLE acdata