使用数据库事务确保只有一个进程从作业 table 中获取作业
Using database transactions to ensure only one process picks up a job from jobs table
我想确保 只有一个 工作节点(节点集群中的)从作业 table 中获取任何给定作业并处理它。我在我的工作 table 上使用以下数据库事务来实现此目的:
BEGIN TRANSACTION;
SELECT id FROM jobs
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 1;
-- rollback if id is null
UPDATE jobs
SET status = 'IN_PROCESS'
WHERE id = 'id';
COMMIT;
上述 TRANSACTION 能否确保只有一个节点 会接受任何给定的工作?有没有可能两个节点同时读取SELECT语句作为NEW,然后都运行UPDATE语句(在第一个释放行锁之后) 并开始处理 相同的作业?
换句话说,TRANSACTION 是否也会为 SELECT 语句提供锁,还是仅为 UPDATE 语句提供锁?
不,事务在这里帮不了你,除非你将隔离级别提高到 SERIALIZABLE
。这是万不得已的选择,但如果可能的话我会避免它。
我看到的可能性是:
悲观锁。将 FOR UPDATE
添加到 SELECT
。这些限制了性能。
乐观锁。似乎最适合您的需求。
使用链接到您的 table 的预定义队列管理器。
实现一个队列,或者一个队列进程。
在我看来,#2 最适合它。如果是这种情况,您需要向 table 添加一个额外的列:version
。添加该列后,您的查询将需要更改为:
SELECT id, version FROM jobs
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 1;
UPDATE jobs
SET status = 'IN_PROCESS', version = version + 1
WHERE id = 'id_retrieved_before' and version = 'version_retrieved_before';
上面的更新 returns 更新的行数。如果计数是 1
那么这个线程得到了行。如果它是 0
,那么另一个竞争线程获得了该行,您将需要再次重试此策略。
如您所见:
- 不需要交易。
- 所有引擎 return 更新的行数,因此该策略几乎适用于任何数据库。
- 不需要锁。这提供了很好的性能。
- 缺点是如果另一个线程得到了行,逻辑需要从头开始。没什么大不了的,但在竞争线程多的场景下,不能保证最佳响应时间。
最后,如果您的数据库是 PostgreSQL,您可以将两个 SQL 语句打包成一个。 PostgreSQL 是不是很棒?
我想确保 只有一个 工作节点(节点集群中的)从作业 table 中获取任何给定作业并处理它。我在我的工作 table 上使用以下数据库事务来实现此目的:
BEGIN TRANSACTION;
SELECT id FROM jobs
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 1;
-- rollback if id is null
UPDATE jobs
SET status = 'IN_PROCESS'
WHERE id = 'id';
COMMIT;
上述 TRANSACTION 能否确保只有一个节点 会接受任何给定的工作?有没有可能两个节点同时读取SELECT语句作为NEW,然后都运行UPDATE语句(在第一个释放行锁之后) 并开始处理 相同的作业?
换句话说,TRANSACTION 是否也会为 SELECT 语句提供锁,还是仅为 UPDATE 语句提供锁?
不,事务在这里帮不了你,除非你将隔离级别提高到 SERIALIZABLE
。这是万不得已的选择,但如果可能的话我会避免它。
我看到的可能性是:
悲观锁。将
FOR UPDATE
添加到SELECT
。这些限制了性能。乐观锁。似乎最适合您的需求。
使用链接到您的 table 的预定义队列管理器。
实现一个队列,或者一个队列进程。
在我看来,#2 最适合它。如果是这种情况,您需要向 table 添加一个额外的列:version
。添加该列后,您的查询将需要更改为:
SELECT id, version FROM jobs
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 1;
UPDATE jobs
SET status = 'IN_PROCESS', version = version + 1
WHERE id = 'id_retrieved_before' and version = 'version_retrieved_before';
上面的更新 returns 更新的行数。如果计数是 1
那么这个线程得到了行。如果它是 0
,那么另一个竞争线程获得了该行,您将需要再次重试此策略。
如您所见:
- 不需要交易。
- 所有引擎 return 更新的行数,因此该策略几乎适用于任何数据库。
- 不需要锁。这提供了很好的性能。
- 缺点是如果另一个线程得到了行,逻辑需要从头开始。没什么大不了的,但在竞争线程多的场景下,不能保证最佳响应时间。
最后,如果您的数据库是 PostgreSQL,您可以将两个 SQL 语句打包成一个。 PostgreSQL 是不是很棒?