Postgresql FOR UPDATE SKIP LOCKED 仍然选择重复的行

Postgresql FOR UPDATE SKIP LOCKED still selects duplicated rows

我正在使用 PostgreSQL 作为作业队列。以下是我检索作业并更新其状态的查询:

        UPDATE requests AS re
        SET
          started_at = NOW(),
          finished_at = NULL
        FROM (
          SELECT
            _re.*
          FROM requests AS _re
          WHERE
            _re.state = 'pending'
          AND
            _re.started_at IS NULL
          LIMIT 1
          FOR UPDATE SKIP LOCKED
        ) AS sub
        WHERE re.id = sub.id
        RETURNING
          sub.*

现在,我有几台机器,每台机器上有 1 个进程和多个线程,每个线程上有一个工作线程。同一进程中的所有工作人员共享一个连接池,通常有 10 - 20 个连接。

问题是,上面的查询会 return 一些行不止一次!

我找不到任何理由。有人可以帮忙吗?

更详细地说,我正在使用 Python3 和 psycopg2。


更新:

我试过@a_horse_with_no_name的回答,但似乎不行。

我注意到,一个请求被两个查询检索到 started_at 更新为:

2016-04-21 14:23:06.970897+08

2016-04-21 14:23:06.831345+08

只相差0.14秒

我想知道在这两个连接执行内部 SELECT 子查询时,两个锁是否还没有建立?


更新:

更准确地说,我在 1 台机器上的 1 个进程中有 200 个工人(即 200 个线程)。

另请注意,如果您不希望每个线程互相妨碍,则每个线程都有自己的连接是必不可少的。

If your application uses multiple threads of execution, they cannot share a connection concurrently. You must either explicitly control access to the connection (using mutexes) or use a connection for each thread. If each thread uses its own connection, you will need to use the AT clause to specify which connection the thread will use.

来自:http://www.postgresql.org/docs/9.5/static/ecpg-connect.html

如果两个线程共享同一个连接,就会发生各种奇怪的事情。我相信这就是您的情况。如果您使用一个连接锁定,则使用同一连接的所有其他线程都可以访问锁定的对象。

请允许我提出一个替代方法,它非常简单。使用redis作为队列。您可以简单地使用 redis-py 和 lpush/rpop 方法或使用 python-rq。

有可能在 select 时尚未发出锁定事务,或者在 select 的结果准备好和更新语句时锁定丢失开始。您是否尝试过明确开始交易?

BEGIN;
  WITH req AS (
    SELECT id
    FROM requests AS _re
    WHERE _re.state = 'pending' AND _re.started_at IS NULL
    LIMIT 1 FOR UPDATE SKIP LOCKED
    )
  UPDATE requests SET started_at = NOW(), finished_at = NULL
  FROM req
  WHERE requests.id = req.id;
COMMIT;