如何根据 table 的增长大小修改 Spark read jdbc 中的属性?

How to amend properties in Spark read jdbc according to growing size of table?

我有一个定期将数据从 Postgres 移动到 Redshift 的 Spark 作业。我将 jdbc.read 函数与 lowerBoundupperBound 参数一起使用:

df = spark.read.jdbc(url=jdbc_url, \
          table='some_table',\
          column='id',\
          lowerBound=1,\
          upperBound=20000000, \
          numPartitions=50)

目前 upperBound 是硬编码的,但是 table 的大小每天都在增长,所以我需要以某种方式动态更新 upperBound 值以反映 table 在下一个作业开始时 运行。如何使 upperBound 值等于 table 的当前大小?

您可以在执行主查询之前获取上限值,然后使用它们

query = "(SELECT min({0}), max({0}) FROM {1}) AS temp".format(
    partition_column, table
)

(lower_bound, upper_bound) = (spark.read
    .jdbc(url=url, table=query. properties=properties)
    .first())

df = spark.read.jdbc(url=jdbc_url, \
          table='some_table',\
          column='id',\
          lowerBound=1,\
          upperBound=upper_bound + 10, \
          numPartitions=50)