在 Neo4J Cypher 中实现并行批处理队列

Implement a parallel batch processing queue in Neo4J Cypher

当我使用 Informix SQL 时,可以使用简单更新的事务隔离来创建类似这样的处理队列:

UPDATE Queue
SET processed_by = ?
WHERE processed_by IS NULL
ORDER BY inserted_at
LIMIT 3

这在第一个参数(?,例如服务器名称和线程编号)中标记最多 3 个项目(队列可能为空)由处理器处理。 UPDATE 在每个并行处理器(单独的线程或机器)上完成后,队列被分片:

+-----+------------------+---------------+
| _id |   inserted_at    | processed_by  |
+-----+------------------+---------------+
|   1 | 2017-03-07 01:15 | host2:thread3 |
|   2 | 2017-03-07 01:16 | host2:thread3 |
|   3 | 2017-03-07 01:17 | host2:thread3 |
|   4 | 2017-03-07 01:18 | host1:thread1 |
|   5 | 2017-03-07 01:19 | host1:thread1 |
|   6 | 2017-03-07 01:20 | host1:thread1 |
|   7 | 2017-03-07 01:21 | NULL          |
+-----+------------------+---------------+

并且每个处理器都可以单独处理一个批处理:

SELECT * from Queue WHERE processed_by = ?

虽然他们可以确定没有其他人接触他们要处理的物品。作为一个副作用,如果一个处理器死了,他们很容易从他们离开的地方继续。


我正在尝试使用 Cypher 实现相同的目的,但是当并行读取观察到相同的状态并因此更新相同的项目时,我 运行 遇到了问题:

MATCH (n:Queue)
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
RETURN n
LIMIT 3

在此基础上,我能够lock items并用这个进行处理:

MATCH (n:Queue)
SET n._LOCK_ = true
WITH n
WHERE NOT exists(n.processed_by)
SET n.processed_by = $processor
REMOVE n._LOCK_
RETURN n
LIMIT 3

问题是 _LOCK_ 应用于所有队列节点,而不仅仅是其中的 3 个,如果它们很多,这可能会导致性能下降。如果有时由于其他处理器锁定了所有可用项目,处理器会取回 0 个锁定项目,这不会有问题。

用 Cypher 表达这个的正确方法是什么?
如果可能的话,我还想包括排序(参见 SQL 中的 inserted_at)。
我也很乐意让它不受限制地工作,所以只标记一个没有冲突的项目。

注意:我希望这可以在不使用任何服务器端扩展的情况下实现。

[更新]

您的第一个查询的问题是,如果另一个 thread/process 已经锁定了一个节点(通过设置一个节点 属性;在本例中为 processed_by),尝试仅在同一节点上设置写锁暂时 阻止您的thread/process 代码继续进行。一旦另一个 thread/process 完成它的处理,它的写锁被释放,你的代码将继续设置 processed_by -- 这会覆盖另一个 thread/process 之前写的!

如您所知,您的第二个查询的问题在于它锁定了所有 Queue 节点,这阻止了您可以通过使用多个 threads/processes.[=20= 获得的任何性能改进。 ]

这可能对你有用:

MATCH (n:Queue)
WHERE NOT EXISTS(n.processed_by)
WITH n LIMIT 3
SET n._LOCK_ = true
WITH COLLECT(n) AS lockedNodes
WITH REDUCE(s = [], n IN lockedNodes |
  CASE WHEN NOT EXISTS(n.processed_by) THEN s + n ELSE s END ) AS nodesToSet, lockedNodes
FOREACH(x IN nodesToSet | SET x.processed_by = $processor)
FOREACH(x IN lockedNodes | REMOVE x._LOCK_)
RETURN nodesToSet, lockedNodes;

此查询首先获取最多 3 个没有 processed_by 属性 的节点,并尝试在每个节点上设置写锁(通过设置它们的 _LOCK_ 属性)。这避免锁定所有 Queue 个节点。

如果其他 threads/processes 已经在 1 个或多个相同节点上拥有写锁,则您的 thread/process 将被阻止,直到这些锁被释放。在您的 thread/process 获得所有写锁后,其中一些节点上的 processed_by 属性 可能已被其他 threads/processes 设置。因此,此查询再次测试 processed_by 属性 是否存在,并且仅在仍然不存在时才设置它。 RETURN 子句 returns 不仅是你更改的节点的集合,也是最初找到并锁定的节点的集合(可以比你更改的节点的集合更大)。

如果锁定队列节点的唯一原因是写入 processed_by 属性,您可以考虑锁定指定为 :QueueLock 的其他节点。它的唯一目的是被锁定,以便您可以执行队列操作。

这当然需要所有队列操作(用于标记为 processed_by,并且可能用于任何可能影响它的操作)锁定 :QueueLock。

您还需要在 SET 操作之前限制您的节点,因为在您当前的查询中,您只限制了节点的 return(它是 returning 3,但是在此之前,它将所有未分配的节点分配给单个处理器)。

MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
WHERE NOT exists(n.processed_by)
LIMIT 3
SET n.processed_by = $processor
REMOVE lock._LOCK_
RETURN n

但是,在您的节点上使用不同的标签可能会有所帮助,这样在分配的节点上的操作就不必等待锁定。

如果您为排队的未分配节点保留 :Queue(因此当节点在队列中时 processed_by 将永远不存在),并且为要处理的已分配节点保留 :Assigned,这应该适合您:

MATCH (lock:QueueLock)
SET lock._LOCK_ = true
MATCH (n:Queue)
WITH n
LIMIT 3
SET n.processed_by = $processor
REMOVE n:Queue
SET n:Assigned
REMOVE lock._LOCK_
RETURN n

我用作最终代码(包括排序)的 @cybersam's 的另一个变体:

// find all of the queued items
MATCH (n:Queue)
  // that haven't been marked yet
  WHERE NOT exists(n.processed_by)
// take the first $count of those and lock them
WITH n
  ORDER BY n.something ASCENDING
  LIMIT $count
  SET n._LOCK_ = true
  // write lock will be released after RETURN "commits", but clean up sooner
  REMOVE n._LOCK_
// after locking check if they're still unmarked
// (between the first check and locking they could have been marked by others)
WITH n
  WHERE NOT exists(n.processed_by)
  // and mark these unmarked ones as ours (count(n) <= than original limit)
  SET n.processed_by = $processor, n.processing_started = timestamp()
RETURN n;

根据我的测试,行为是相同的,但不需要命令式 Cypher。

我相信通过对变量名 (WITH n as x) 的一些操作,也可以得到要返回的 "locked, but not marked" 节点,但我并不需要这些实现细节作为结果查询。