多个工作线程和数据库同步

Multiple worker threads and Database synchronization

我有多个线程将文件保存在磁盘上并将该信息放入数据库。

在应用程序的另一端,我有多个线程从数据库读取此信息并一次处理一个文件,按 file_id:

排序

SELECT * FROM files_to_process ORDER BY file_id

我发明的是制作一个 PROCESSING_STATUS 列,它有四种状态 NEWPROCESSINGFAILEDSUCCESS.

每个工作人员都应该从数据库中读取 ONLY 一行,按 ID 排序,状态为 NEW 并立即更新为状态 PROCESSING,因此另一个工作人员不会处理相同的文件。

但是,有些事情告诉我,我可能会遇到一些竞争条件。

交易能解决这个问题吗?

不幸的是,我无法在交易中进行所有操作,因为处理文件需要花费大量时间并且交易池将被耗尽,所以我必须按以下顺序进行两次交易。

  1. [交易中] 获取行并更新状态 PROCESSING
  2. [无交易] 处理文件
  3. [交易中]根据结果更新最终状态为SUCCESSFAILED

尝试互斥,简单的例子:

try {
  mutex.acquire();
  try {
    // access and update record to processing
  } finally {
    mutex.release();
  }
} catch(InterruptedException ie) {
  // ...
}

根据您的代码,您可以通过多种方式锁定它,请参阅: Is there a Mutex in Java?

编辑:

抱歉,我认为这是一个 C++ 问题,这是 java 版本

非常恼人的是,UPDATE 在 PostgreSQL 中不使用 LIMIT。

你可以这样做:

update files_to_process set processing_status='PROCESSING' where file_id = (
    SELECT file_id FROM files_to_process 
      WHERE processing_status = 'NEW' 
      ORDER BY file_id FOR UPDATE SKIP LOCKED LIMIT 1
) returning *;

使用这个公式,应该没有竞争条件。您将 运行 在事务中单独使用它(或在自动提交下,仅 运行 语句,它将自动形成自己的事务)。

但我可能不会只使用 'PROCESSING',而是使用 'PROCESSING by machine worker7 PID 19345' 或类似的东西。否则,如果以不干净的方式失败,您如何知道处理何时失败? (这是在一次事务中完成它的好处,失败应该自行回滚)。

Unfortunately I can't make all operation inside transaction since processing files takes a lot of time and transaction pool will be exhausted

但是,您的未完成交易永远不应超过可用于工作的 CPU。除非你有一个非常大的计算场,否则你应该能够使池足够大。但这种方法的最大问题是您无法了解正在发生的事情。

对于两个事务方法,为了性能,您可能需要制作部分索引:

create index on files_to_process (file_id ) where processing_status = 'NEW';

否则你将不得不挖掘所有低 file_id 的已完成的以找到下一个新的,最终会变慢。您可能还需要比默认值更积极地 VACUUM table。