多个工作线程和数据库同步
Multiple worker threads and Database synchronization
我有多个线程将文件保存在磁盘上并将该信息放入数据库。
在应用程序的另一端,我有多个线程从数据库读取此信息并一次处理一个文件,按 file_id
:
排序
SELECT * FROM files_to_process ORDER BY file_id
我发明的是制作一个 PROCESSING_STATUS
列,它有四种状态 NEW
、PROCESSING
、FAILED
、SUCCESS
.
每个工作人员都应该从数据库中读取 ONLY 一行,按 ID 排序,状态为 NEW
并立即更新为状态 PROCESSING
,因此另一个工作人员不会处理相同的文件。
但是,有些事情告诉我,我可能会遇到一些竞争条件。
交易能解决这个问题吗?
不幸的是,我无法在交易中进行所有操作,因为处理文件需要花费大量时间并且交易池将被耗尽,所以我必须按以下顺序进行两次交易。
- [交易中] 获取行并更新状态
PROCESSING
- [无交易] 处理文件
- [交易中]根据结果更新最终状态为
SUCCESS
或FAILED
尝试互斥,简单的例子:
try {
mutex.acquire();
try {
// access and update record to processing
} finally {
mutex.release();
}
} catch(InterruptedException ie) {
// ...
}
根据您的代码,您可以通过多种方式锁定它,请参阅:
Is there a Mutex in Java?
编辑:
抱歉,我认为这是一个 C++ 问题,这是 java 版本
非常恼人的是,UPDATE 在 PostgreSQL 中不使用 LIMIT。
你可以这样做:
update files_to_process set processing_status='PROCESSING' where file_id = (
SELECT file_id FROM files_to_process
WHERE processing_status = 'NEW'
ORDER BY file_id FOR UPDATE SKIP LOCKED LIMIT 1
) returning *;
使用这个公式,应该没有竞争条件。您将 运行 在事务中单独使用它(或在自动提交下,仅 运行 语句,它将自动形成自己的事务)。
但我可能不会只使用 'PROCESSING',而是使用 'PROCESSING by machine worker7 PID 19345' 或类似的东西。否则,如果以不干净的方式失败,您如何知道处理何时失败? (这是在一次事务中完成它的好处,失败应该自行回滚)。
Unfortunately I can't make all operation inside transaction since processing files takes a lot of time and transaction pool will be exhausted
但是,您的未完成交易永远不应超过可用于工作的 CPU。除非你有一个非常大的计算场,否则你应该能够使池足够大。但这种方法的最大问题是您无法了解正在发生的事情。
对于两个事务方法,为了性能,您可能需要制作部分索引:
create index on files_to_process (file_id ) where processing_status = 'NEW';
否则你将不得不挖掘所有低 file_id 的已完成的以找到下一个新的,最终会变慢。您可能还需要比默认值更积极地 VACUUM table。
我有多个线程将文件保存在磁盘上并将该信息放入数据库。
在应用程序的另一端,我有多个线程从数据库读取此信息并一次处理一个文件,按 file_id
:
SELECT * FROM files_to_process ORDER BY file_id
我发明的是制作一个 PROCESSING_STATUS
列,它有四种状态 NEW
、PROCESSING
、FAILED
、SUCCESS
.
每个工作人员都应该从数据库中读取 ONLY 一行,按 ID 排序,状态为 NEW
并立即更新为状态 PROCESSING
,因此另一个工作人员不会处理相同的文件。
但是,有些事情告诉我,我可能会遇到一些竞争条件。
交易能解决这个问题吗?
不幸的是,我无法在交易中进行所有操作,因为处理文件需要花费大量时间并且交易池将被耗尽,所以我必须按以下顺序进行两次交易。
- [交易中] 获取行并更新状态
PROCESSING
- [无交易] 处理文件
- [交易中]根据结果更新最终状态为
SUCCESS
或FAILED
尝试互斥,简单的例子:
try {
mutex.acquire();
try {
// access and update record to processing
} finally {
mutex.release();
}
} catch(InterruptedException ie) {
// ...
}
根据您的代码,您可以通过多种方式锁定它,请参阅: Is there a Mutex in Java?
编辑:
抱歉,我认为这是一个 C++ 问题,这是 java 版本
非常恼人的是,UPDATE 在 PostgreSQL 中不使用 LIMIT。
你可以这样做:
update files_to_process set processing_status='PROCESSING' where file_id = (
SELECT file_id FROM files_to_process
WHERE processing_status = 'NEW'
ORDER BY file_id FOR UPDATE SKIP LOCKED LIMIT 1
) returning *;
使用这个公式,应该没有竞争条件。您将 运行 在事务中单独使用它(或在自动提交下,仅 运行 语句,它将自动形成自己的事务)。
但我可能不会只使用 'PROCESSING',而是使用 'PROCESSING by machine worker7 PID 19345' 或类似的东西。否则,如果以不干净的方式失败,您如何知道处理何时失败? (这是在一次事务中完成它的好处,失败应该自行回滚)。
Unfortunately I can't make all operation inside transaction since processing files takes a lot of time and transaction pool will be exhausted
但是,您的未完成交易永远不应超过可用于工作的 CPU。除非你有一个非常大的计算场,否则你应该能够使池足够大。但这种方法的最大问题是您无法了解正在发生的事情。
对于两个事务方法,为了性能,您可能需要制作部分索引:
create index on files_to_process (file_id ) where processing_status = 'NEW';
否则你将不得不挖掘所有低 file_id 的已完成的以找到下一个新的,最终会变慢。您可能还需要比默认值更积极地 VACUUM table。