如何使用 Apache Beam 管理背压

How to manage backpressure with Apache Beam

我有非常基本的 apache beam 管道,它在 GCP 数据流上运行并从 PubSub 读取一些数据,对其进行转换并将其写入 Postgres 数据库。所有这些都是使用 Apache Beam 的标准 readers/writers 组件完成的。问题是当我的管道开始接收大量数据时,我的 Postgres 端由于等待 ShareLocks 而出现死锁错误。

很明显,这样的事情是因为Postgres端溢出造成的。我的管道试图一次写得太快和太多东西,所以为了避免这种情况,它应该放慢速度。因此,我们可以使用诸如背压之类的机制。我试图挖掘出有关 Apache Beam 背压配置的任何信息,不幸的是,官方文档似乎对此类问题保持沉默。

我对以下类型的异常感到不知所措:

java.sql.BatchUpdateException: Batch entry <NUMBER>
<MY_STATEMENT>
 was aborted: ERROR: deadlock detected
  Detail: Process 87768 waits for ShareLock on transaction 1939992; blocked by process 87769.
Process 87769 waits for ShareLock on transaction 1939997; blocked by process 87768.
  Hint: See server log for query details.
  Where: while inserting index tuple (5997152,9) in relation "<MY_TABLE>"  Call getNextException to see other errors in the batch.

我想知道是否有任何背压工具包或类似的东西可以帮助我在不编写自己的情况下管理我的问题PostgresIO.Writer

非常感谢。

假设你使用JdbcIO写入Postgres,你可以尝试增加batch size(参见withBatchSize(long batchSize)),默认是1K条记录,可能是不够的。

此外,如果出现 SQL 异常,并且您想重试,则需要确保使用正确的重试策略(请参阅 withRetryStrategy(RetryStrategy retryStrategy))。在这种情况下,将应用 FluentBackoff