在 Neo4J Cypher 中实现并行批处理队列
Implement a parallel batch processing queue in Neo4J Cypher
当我使用 Informix SQL 时,可以使用简单更新的事务隔离来创建类似这样的处理队列:
UPDATE Queue
SET processed_by = ?
WHERE processed_by IS NULL
ORDER BY inserted_at
LIMIT 3
这在第一个参数(?
,例如服务器名称和线程编号)中标记最多 3 个项目(队列可能为空)由处理器处理。 UPDATE
在每个并行处理器(单独的线程或机器)上完成后,队列被分片:
+-----+------------------+---------------+
| _id | inserted_at | processed_by |
+-----+------------------+---------------+
| 1 | 2017-03-07 01:15 | host2:thread3 |
| 2 | 2017-03-07 01:16 | host2:thread3 |
| 3 | 2017-03-07 01:17 | host2:thread3 |
| 4 | 2017-03-07 01:18 | host1:thread1 |
| 5 | 2017-03-07 01:19 | host1:thread1 |
| 6 | 2017-03-07 01:20 | host1:thread1 |
| 7 | 2017-03-07 01:21 | NULL |
+-----+------------------+---------------+
并且每个处理器都可以单独处理一个批处理:
SELECT * from Queue WHERE processed_by = ?
虽然他们可以确定没有其他人接触他们要处理的物品。作为一个副作用,如果一个处理器死了,他们很容易从他们离开的地方继续。
我正在尝试使用 Cypher 实现相同的目的,但是当并行读取观察到相同的状态并因此更新相同的项目时,我 运行 遇到了问题:
MATCH (n:Queue)
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
RETURN n
LIMIT 3
在此基础上,我能够lock items并用这个进行处理:
MATCH (n:Queue)
SET n._LOCK_ = true
WITH n
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
REMOVE n._LOCK_
RETURN n
LIMIT 3
问题是 _LOCK_
应用于所有队列节点,而不仅仅是其中的 3 个,如果它们很多,这可能会导致性能下降。如果有时由于其他处理器锁定了所有可用项目,处理器会取回 0 个锁定项目,这不会有问题。
用 Cypher 表达这个的正确方法是什么?
如果可能的话,我还想包括排序(参见 SQL 中的 inserted_at
)。
我也很乐意让它不受限制地工作,所以只标记一个没有冲突的项目。
注意:我希望这可以在不使用任何服务器端扩展的情况下实现。
[更新]
您的第一个查询的问题是,如果另一个 thread/process 已经锁定了一个节点(通过设置一个节点 属性;在本例中为 processed_by
),尝试仅在同一节点上设置写锁暂时 阻止您的thread/process 代码继续进行。一旦另一个 thread/process 完成它的处理,它的写锁被释放,你的代码将继续设置 processed_by
-- 这会覆盖另一个 thread/process 之前写的!
如您所知,您的第二个查询的问题在于它锁定了所有 Queue
节点,这阻止了您可以通过使用多个 threads/processes.[=20= 获得的任何性能改进。 ]
这可能对你有用:
MATCH (n:Queue)
WHERE NOT EXISTS(n.processed_by)
WITH n LIMIT 3
SET n._LOCK_ = true
WITH COLLECT(n) AS lockedNodes
WITH REDUCE(s = [], n IN lockedNodes |
CASE WHEN NOT EXISTS(n.processed_by) THEN s + n ELSE s END ) AS nodesToSet, lockedNodes
FOREACH(x IN nodesToSet | SET x.processed_by = $processor)
FOREACH(x IN lockedNodes | REMOVE x._LOCK_)
RETURN nodesToSet, lockedNodes;
此查询首先获取最多 3 个没有 processed_by
属性 的节点,并尝试在每个节点上设置写锁(通过设置它们的 _LOCK_
属性)。这避免锁定所有 Queue
个节点。
如果其他 threads/processes 已经在 1 个或多个相同节点上拥有写锁,则您的 thread/process 将被阻止,直到这些锁被释放。在您的 thread/process 获得所有写锁后,其中一些节点上的 processed_by
属性 可能已被其他 threads/processes 设置。因此,此查询再次测试 processed_by
属性 是否存在,并且仅在仍然不存在时才设置它。 RETURN
子句 returns 不仅是你更改的节点的集合,也是最初找到并锁定的节点的集合(可以比你更改的节点的集合更大)。
如果锁定队列节点的唯一原因是写入 processed_by
属性,您可以考虑锁定指定为 :QueueLock 的其他节点。它的唯一目的是被锁定,以便您可以执行队列操作。
这当然需要所有队列操作(用于标记为 processed_by
,并且可能用于任何可能影响它的操作)锁定 :QueueLock。
您还需要在 SET 操作之前限制您的节点,因为在您当前的查询中,您只限制了节点的 return(它是 returning 3,但是在此之前,它将所有未分配的节点分配给单个处理器)。
MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
WHERE NOT exists(n.processed_by)
LIMIT 3
SET n.processed_by = $processor
REMOVE lock._LOCK_
RETURN n
但是,在您的节点上使用不同的标签可能会有所帮助,这样在分配的节点上的操作就不必等待锁定。
如果您为排队的未分配节点保留 :Queue(因此当节点在队列中时 processed_by
将永远不存在),并且为要处理的已分配节点保留 :Assigned,这应该适合您:
MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
LIMIT 3
SET n.processed_by = $processor
REMOVE n:Queue
SET n:Assigned
REMOVE lock._LOCK_
RETURN n
我用作最终代码(包括排序)的 @cybersam's 的另一个变体:
// find all of the queued items
MATCH (n:Queue)
// that haven't been marked yet
WHERE NOT exists(n.processed_by)
// take the first $count of those and lock them
WITH n
ORDER BY n.something ASCENDING
LIMIT $count
SET n._LOCK_ = true
// write lock will be released after RETURN "commits", but clean up sooner
REMOVE n._LOCK_
// after locking check if they're still unmarked
// (between the first check and locking they could have been marked by others)
WITH n
WHERE NOT exists(n.processed_by)
// and mark these unmarked ones as ours (count(n) <= than original limit)
SET n.processed_by = $processor, n.processing_started = timestamp()
RETURN n;
根据我的测试,行为是相同的,但不需要命令式 Cypher。
我相信通过对变量名 (WITH n as x
) 的一些操作,也可以得到要返回的 "locked, but not marked" 节点,但我并不需要这些实现细节作为结果查询。
当我使用 Informix SQL 时,可以使用简单更新的事务隔离来创建类似这样的处理队列:
UPDATE Queue
SET processed_by = ?
WHERE processed_by IS NULL
ORDER BY inserted_at
LIMIT 3
这在第一个参数(?
,例如服务器名称和线程编号)中标记最多 3 个项目(队列可能为空)由处理器处理。 UPDATE
在每个并行处理器(单独的线程或机器)上完成后,队列被分片:
+-----+------------------+---------------+
| _id | inserted_at | processed_by |
+-----+------------------+---------------+
| 1 | 2017-03-07 01:15 | host2:thread3 |
| 2 | 2017-03-07 01:16 | host2:thread3 |
| 3 | 2017-03-07 01:17 | host2:thread3 |
| 4 | 2017-03-07 01:18 | host1:thread1 |
| 5 | 2017-03-07 01:19 | host1:thread1 |
| 6 | 2017-03-07 01:20 | host1:thread1 |
| 7 | 2017-03-07 01:21 | NULL |
+-----+------------------+---------------+
并且每个处理器都可以单独处理一个批处理:
SELECT * from Queue WHERE processed_by = ?
虽然他们可以确定没有其他人接触他们要处理的物品。作为一个副作用,如果一个处理器死了,他们很容易从他们离开的地方继续。
我正在尝试使用 Cypher 实现相同的目的,但是当并行读取观察到相同的状态并因此更新相同的项目时,我 运行 遇到了问题:
MATCH (n:Queue)
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
RETURN n
LIMIT 3
在此基础上,我能够lock items并用这个进行处理:
MATCH (n:Queue)
SET n._LOCK_ = true
WITH n
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
REMOVE n._LOCK_
RETURN n
LIMIT 3
问题是 _LOCK_
应用于所有队列节点,而不仅仅是其中的 3 个,如果它们很多,这可能会导致性能下降。如果有时由于其他处理器锁定了所有可用项目,处理器会取回 0 个锁定项目,这不会有问题。
用 Cypher 表达这个的正确方法是什么?
如果可能的话,我还想包括排序(参见 SQL 中的 inserted_at
)。
我也很乐意让它不受限制地工作,所以只标记一个没有冲突的项目。
注意:我希望这可以在不使用任何服务器端扩展的情况下实现。
[更新]
您的第一个查询的问题是,如果另一个 thread/process 已经锁定了一个节点(通过设置一个节点 属性;在本例中为 processed_by
),尝试仅在同一节点上设置写锁暂时 阻止您的thread/process 代码继续进行。一旦另一个 thread/process 完成它的处理,它的写锁被释放,你的代码将继续设置 processed_by
-- 这会覆盖另一个 thread/process 之前写的!
如您所知,您的第二个查询的问题在于它锁定了所有 Queue
节点,这阻止了您可以通过使用多个 threads/processes.[=20= 获得的任何性能改进。 ]
这可能对你有用:
MATCH (n:Queue)
WHERE NOT EXISTS(n.processed_by)
WITH n LIMIT 3
SET n._LOCK_ = true
WITH COLLECT(n) AS lockedNodes
WITH REDUCE(s = [], n IN lockedNodes |
CASE WHEN NOT EXISTS(n.processed_by) THEN s + n ELSE s END ) AS nodesToSet, lockedNodes
FOREACH(x IN nodesToSet | SET x.processed_by = $processor)
FOREACH(x IN lockedNodes | REMOVE x._LOCK_)
RETURN nodesToSet, lockedNodes;
此查询首先获取最多 3 个没有 processed_by
属性 的节点,并尝试在每个节点上设置写锁(通过设置它们的 _LOCK_
属性)。这避免锁定所有 Queue
个节点。
如果其他 threads/processes 已经在 1 个或多个相同节点上拥有写锁,则您的 thread/process 将被阻止,直到这些锁被释放。在您的 thread/process 获得所有写锁后,其中一些节点上的 processed_by
属性 可能已被其他 threads/processes 设置。因此,此查询再次测试 processed_by
属性 是否存在,并且仅在仍然不存在时才设置它。 RETURN
子句 returns 不仅是你更改的节点的集合,也是最初找到并锁定的节点的集合(可以比你更改的节点的集合更大)。
如果锁定队列节点的唯一原因是写入 processed_by
属性,您可以考虑锁定指定为 :QueueLock 的其他节点。它的唯一目的是被锁定,以便您可以执行队列操作。
这当然需要所有队列操作(用于标记为 processed_by
,并且可能用于任何可能影响它的操作)锁定 :QueueLock。
您还需要在 SET 操作之前限制您的节点,因为在您当前的查询中,您只限制了节点的 return(它是 returning 3,但是在此之前,它将所有未分配的节点分配给单个处理器)。
MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
WHERE NOT exists(n.processed_by)
LIMIT 3
SET n.processed_by = $processor
REMOVE lock._LOCK_
RETURN n
但是,在您的节点上使用不同的标签可能会有所帮助,这样在分配的节点上的操作就不必等待锁定。
如果您为排队的未分配节点保留 :Queue(因此当节点在队列中时 processed_by
将永远不存在),并且为要处理的已分配节点保留 :Assigned,这应该适合您:
MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
LIMIT 3
SET n.processed_by = $processor
REMOVE n:Queue
SET n:Assigned
REMOVE lock._LOCK_
RETURN n
我用作最终代码(包括排序)的 @cybersam's
// find all of the queued items
MATCH (n:Queue)
// that haven't been marked yet
WHERE NOT exists(n.processed_by)
// take the first $count of those and lock them
WITH n
ORDER BY n.something ASCENDING
LIMIT $count
SET n._LOCK_ = true
// write lock will be released after RETURN "commits", but clean up sooner
REMOVE n._LOCK_
// after locking check if they're still unmarked
// (between the first check and locking they could have been marked by others)
WITH n
WHERE NOT exists(n.processed_by)
// and mark these unmarked ones as ours (count(n) <= than original limit)
SET n.processed_by = $processor, n.processing_started = timestamp()
RETURN n;
根据我的测试,行为是相同的,但不需要命令式 Cypher。
我相信通过对变量名 (WITH n as x
) 的一些操作,也可以得到要返回的 "locked, but not marked" 节点,但我并不需要这些实现细节作为结果查询。