MySQL 中并发工作人员的原子读取和更新

Atomic read and update in MySQL with concurrent workers

假设我有多个工作人员可以同时读写 MySQL table(例如 jobs)。每个工人的任务是:

  1. 找到最早的 QUEUED 工作
  2. 将其状态设置为 RUNNING
  3. Return对应的ID。

请注意,当工作人员运行第 1 步时,可能 没有 任何符合条件的(即 QUEUED)工作。

到目前为止我有以下伪代码。如果第 1 步 returns 没有工作,我相信我需要取消 (ROLLBACK) 交易。我将如何在下面的代码中做到这一点?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

目前还不是很清楚你在追求什么。但假设您的任务是:找到下一份 QUEUED 工作。将其状态设置为 RUNNING 和 select 相应的 ID。

在单线程环境中,您可以只使用您的代码。将 selected ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要交易,因为只有一份书面声明。您可以在 SQLscript 中模仿。

假设这是您当前的状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

您想开始下一个排队的作业(id=2)。

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

你会得到

@id_for_update
2

从最后一个 select。 table 将具有此状态:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

如果您有多个启动作业的进程,则需要使用 FOR UPDATE 锁定该行。但这可以避免使用 LAST_INSERT_ID():

从上面的状态开始,作业 2 已经 运行:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

您将获得:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

新状态是:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

如果 UPDATE 语句不影响任何行(没有排队的行)ROW_COUNT() 将是 0

可能存在一些我不知道的风险 - 但这也不是我的处理方式。我宁愿在 jobs table 中存储更多信息。简单例子:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

现在 运行 作业属于特定进程,您可以 select 它与

SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

您甚至可能想了解更多信息 - 例如。 queued_at, started_at, finished_at.

我这周正在实施与您的案例非常相似的事情。许多工作人员,每个工作人员抓取一组行中的 "next" 行进行处理。

伪代码是这样的:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

使用FOR UPDATE对于避免竞争条件很重要,即不止一个工人试图抢同一行。

有关 SELECT ... INTO 的信息,请参阅 https://dev.mysql.com/doc/refman/8.0/en/select-into.html