MySQL 中并发工作人员的原子读取和更新
Atomic read and update in MySQL with concurrent workers
假设我有多个工作人员可以同时读写 MySQL table(例如 jobs
)。每个工人的任务是:
- 找到最早的
QUEUED
工作
- 将其状态设置为
RUNNING
- Return对应的ID。
请注意,当工作人员运行第 1 步时,可能 没有 任何符合条件的(即 QUEUED
)工作。
到目前为止我有以下伪代码。如果第 1 步 returns 没有工作,我相信我需要取消 (ROLLBACK
) 交易。我将如何在下面的代码中做到这一点?
BEGIN TRANSACTION;
# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED"
ORDER BY created_at ASC LIMIT 1;
# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>
COMMIT;
目前还不是很清楚你在追求什么。但假设您的任务是:找到下一份 QUEUED
工作。将其状态设置为 RUNNING
和 select 相应的 ID。
在单线程环境中,您可以只使用您的代码。将 selected ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要交易,因为只有一份书面声明。您可以在 SQLscript 中模仿。
假设这是您当前的状态:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | QUEUED |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
您想开始下一个排队的作业(id=2)。
SET @id_for_update = (
SELECT id
FROM jobs
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1
);
UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;
SELECT @id_for_update;
你会得到
@id_for_update
2
从最后一个 select。 table 将具有此状态:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
如果您有多个启动作业的进程,则需要使用 FOR UPDATE
锁定该行。但这可以避免使用 LAST_INSERT_ID()
:
从上面的状态开始,作业 2 已经 运行:
UPDATE jobs
SET status = 'RUNNING',
id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
SELECT LAST_INSERT_ID();
您将获得:
| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3 | 1 |
新状态是:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | RUNNING |
| 4 | 2020-06-15 12:00:30 | QUEUED |
如果 UPDATE 语句不影响任何行(没有排队的行)ROW_COUNT()
将是 0
。
可能存在一些我不知道的风险 - 但这也不是我的处理方式。我宁愿在 jobs
table 中存储更多信息。简单例子:
CREATE TABLE jobs (
id INT auto_increment primary key,
created_at timestamp not null default now(),
updated_at timestamp not null default now() on update now(),
status varchar(50) not null default 'QUEUED',
process_id varchar(50) null default null
);
和
UPDATE jobs
SET status = 'RUNNING',
process_id = 'some_unique_pid'
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
现在 运行 作业属于特定进程,您可以 select 它与
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';
您甚至可能想了解更多信息 - 例如。 queued_at
, started_at
, finished_at
.
我这周正在实施与您的案例非常相似的事情。许多工作人员,每个工作人员抓取一组行中的 "next" 行进行处理。
伪代码是这样的:
BEGIN;
SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
COMMIT;
使用FOR UPDATE
对于避免竞争条件很重要,即不止一个工人试图抢同一行。
有关 SELECT ... INTO
的信息,请参阅 https://dev.mysql.com/doc/refman/8.0/en/select-into.html。
假设我有多个工作人员可以同时读写 MySQL table(例如 jobs
)。每个工人的任务是:
- 找到最早的
QUEUED
工作 - 将其状态设置为
RUNNING
- Return对应的ID。
请注意,当工作人员运行第 1 步时,可能 没有 任何符合条件的(即 QUEUED
)工作。
到目前为止我有以下伪代码。如果第 1 步 returns 没有工作,我相信我需要取消 (ROLLBACK
) 交易。我将如何在下面的代码中做到这一点?
BEGIN TRANSACTION;
# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED"
ORDER BY created_at ASC LIMIT 1;
# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>
COMMIT;
目前还不是很清楚你在追求什么。但假设您的任务是:找到下一份 QUEUED
工作。将其状态设置为 RUNNING
和 select 相应的 ID。
在单线程环境中,您可以只使用您的代码。将 selected ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要交易,因为只有一份书面声明。您可以在 SQLscript 中模仿。
假设这是您当前的状态:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | QUEUED |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
您想开始下一个排队的作业(id=2)。
SET @id_for_update = (
SELECT id
FROM jobs
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1
);
UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;
SELECT @id_for_update;
你会得到
@id_for_update
2
从最后一个 select。 table 将具有此状态:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
如果您有多个启动作业的进程,则需要使用 FOR UPDATE
锁定该行。但这可以避免使用 LAST_INSERT_ID()
:
从上面的状态开始,作业 2 已经 运行:
UPDATE jobs
SET status = 'RUNNING',
id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
SELECT LAST_INSERT_ID();
您将获得:
| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3 | 1 |
新状态是:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | RUNNING |
| 4 | 2020-06-15 12:00:30 | QUEUED |
如果 UPDATE 语句不影响任何行(没有排队的行)ROW_COUNT()
将是 0
。
可能存在一些我不知道的风险 - 但这也不是我的处理方式。我宁愿在 jobs
table 中存储更多信息。简单例子:
CREATE TABLE jobs (
id INT auto_increment primary key,
created_at timestamp not null default now(),
updated_at timestamp not null default now() on update now(),
status varchar(50) not null default 'QUEUED',
process_id varchar(50) null default null
);
和
UPDATE jobs
SET status = 'RUNNING',
process_id = 'some_unique_pid'
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
现在 运行 作业属于特定进程,您可以 select 它与
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';
您甚至可能想了解更多信息 - 例如。 queued_at
, started_at
, finished_at
.
我这周正在实施与您的案例非常相似的事情。许多工作人员,每个工作人员抓取一组行中的 "next" 行进行处理。
伪代码是这样的:
BEGIN;
SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
COMMIT;
使用FOR UPDATE
对于避免竞争条件很重要,即不止一个工人试图抢同一行。
有关 SELECT ... INTO
的信息,请参阅 https://dev.mysql.com/doc/refman/8.0/en/select-into.html。