如何根据 table 的增长大小修改 Spark read jdbc 中的属性?
How to amend properties in Spark read jdbc according to growing size of table?
我有一个定期将数据从 Postgres 移动到 Redshift 的 Spark 作业。我将 jdbc.read
函数与 lowerBound
和 upperBound
参数一起使用:
df = spark.read.jdbc(url=jdbc_url, \
table='some_table',\
column='id',\
lowerBound=1,\
upperBound=20000000, \
numPartitions=50)
目前 upperBound
是硬编码的,但是 table 的大小每天都在增长,所以我需要以某种方式动态更新 upperBound
值以反映 table 在下一个作业开始时 运行。如何使 upperBound
值等于 table 的当前大小?
您可以在执行主查询之前获取上限值,然后使用它们
query = "(SELECT min({0}), max({0}) FROM {1}) AS temp".format(
partition_column, table
)
(lower_bound, upper_bound) = (spark.read
.jdbc(url=url, table=query. properties=properties)
.first())
df = spark.read.jdbc(url=jdbc_url, \
table='some_table',\
column='id',\
lowerBound=1,\
upperBound=upper_bound + 10, \
numPartitions=50)
我有一个定期将数据从 Postgres 移动到 Redshift 的 Spark 作业。我将 jdbc.read
函数与 lowerBound
和 upperBound
参数一起使用:
df = spark.read.jdbc(url=jdbc_url, \
table='some_table',\
column='id',\
lowerBound=1,\
upperBound=20000000, \
numPartitions=50)
目前 upperBound
是硬编码的,但是 table 的大小每天都在增长,所以我需要以某种方式动态更新 upperBound
值以反映 table 在下一个作业开始时 运行。如何使 upperBound
值等于 table 的当前大小?
您可以在执行主查询之前获取上限值,然后使用它们
query = "(SELECT min({0}), max({0}) FROM {1}) AS temp".format(
partition_column, table
)
(lower_bound, upper_bound) = (spark.read
.jdbc(url=url, table=query. properties=properties)
.first())
df = spark.read.jdbc(url=jdbc_url, \
table='some_table',\
column='id',\
lowerBound=1,\
upperBound=upper_bound + 10, \
numPartitions=50)