优化和批处理 Parquet/JDBC 连接

Optimizing & batching a Parquet/JDBC join

我正在执行从 S3 拼花数据到 JDBC (Postgres) table 的连接操作,使用拼花数据中的列到 JDBC 的主键table。我需要 JDBC table 中的一小部分(但总体上仍然很大 - 总计数万或数十万行),然后我需要智能地对数据进行分区以供执行程序使用。

作为一个整体,我对数据工程尤其是 Spark 仍然是新手,所以请原谅(并假设!)我的无知。与内存使用相比,我不太关心处理时间;我必须使内存使用符合 Amazon Glue 限制。

执行此操作的好方法是什么?

我现有的想法:

理论上,我可以构造一个 SQL 查询,例如:

select * from t1 where id = key1 UNION
select * from t1 where id = key2 UNION...

但是,这似乎很愚蠢。本题:Selecting multiple rows by ID, is there a faster way than WHERE IN 让我想到将我想要提取的密钥写入临时 table,将其与原始 table 连接起来,然后提取结果;这似乎是执行上述操作的 "correct" 方法。但是,这似乎也是一个很常见的问题,我还没有找到现成的解决方案。

也有可能在 min/max UUID 值之间拉动,但这是我拉动了多少额外行的问题,因为 UUID 是,AFAIK,随机分布在可能的 UUID 值中,我希望得到很多额外的行(在连接期间将被遗漏的行)。不过,这可能是一种对 JDBC 数据进行分区的有用方法。

我也不清楚 JDBC 数据是如何到达执行者的;它可能(完整地)通过驱动程序进程。

因此,尝试将其形式化为问题:

  1. 这个用法有现成的配方吗?
  2. 我应该关注 Spark 的哪些功能来完成此任务?
  3. 来自 JDBC 连接的数据的实际 Spark 数据流是什么?

似乎最好的方法(到目前为止)是将要获取的行 ID 写入数据库上的临时 table,与主 table,然后读出结果(如链接答案中所述)。

从理论上讲,这在 Spark 中是完全可行的;像

// PSUEDOCODE!
df.select("row_id").write.jdbc(<target db>, "ids_to_fetch")
databaseConnection.execute("create table output from (select * from ids_to_fetch join target_table on row_id = id)")
df = df.join(
  spark.read.jdbc(<target db>, "output")
)

这可能是最有效的方法,因为(据我所知)它会将 ID 的写入 的读取加入 table 输出给执行者,而不是试图在驱动程序中做任何事情。

但是,现在我无法将临时 table 写入目标数据库,所以我在驱动程序中生成一系列 select where in 语句,然后提取结果那些。

Spark 并不是为了在驱动程序上执行任何高性能操作而设计的,最好避免它。

对于您的情况,我建议先将数据从 S3 加载到某个 DF。保留此数据框,因为稍后需要它。

然后您可以使用 map(row->).distinct()

的组合来解析来自 S3 的键的唯一值

然后对以上键进行分区,每个分区中的键数量合理,以便对 JDBC 进行单个查询。你也可以坚持上面的结果,然后执行 count() 操作,然后 repartition()。例如,单个分区中的项目不超过 1000 个。

然后使用 mapPartitions,编写一个类似 'SELECT * FROM table WHERE key in ' 的查询。

然后使用 spark flatMap 需要执行实际的 selects。我不知道数据帧的自动方式,所以您可能需要直接使用 JDBC 来执行 select 并映射数据。您可以在工作机器上初始化一个 spring 框架,并使用 spring 数据扩展来轻松地将数据从数据库加载到某些实体列表。

现在您在集群中拥有了包含来自 Postgres 的所需数据的 DatasSet。您可以通过 toDF() 从中创建一个数据框。此处可能需要对列进行一些额外的映射,或者将数据映射到上一步中的 Row 类型。

所以,现在您有 2 个必需的数据帧,一个初始使用来自 S3 的数据持久化,另一个使用来自 Postgres 的数据,您可以使用 Dataframe.join[以标准方式加入它们=38=].

注意:不要忘记使用 .persist() 持久化数据集和框架,当它们将被重用时。否则每次都会重复所有的数据检索步骤。