Neo4j:Cypher 查询以并行化先前查询的结果行

Neo4j: Cypher query to parallelize a row of the result from a previous query

我有一个数据库,其中的句子相互关联。我必须对整个数据库执行一次大更新,因此我正在尝试并行化更新。

相关的密码查询如下所示:

match (s:Sentence)-[r:RELATED]-(t:Sentence)
return s as sentence, collect(t.embedding) as neighbours_embeddings

embedding 是一个数字列表。

这 returns 结果是这样的:

---------------------------------------
| sentence   |  neighbours_embeddings |
---------------------------------------
| sentence1  | [[1, 2, 3], [4, 5, 6]] | 
---------------------------------------
| sentence2  | [[2, 3, 5]]            |
---------------------------------------

现在想对neighbours_embeddings进行一些操作,在相应的Sentence节点设置一个属性

我研究了 Neo4j 中的不同并行化技术,据我所知,它们都需要一个列表作为输入。但我的输入将是一个元组(句子,neighbours_embeddings)。我该如何实现?

感兴趣的人的完整查询:

match (s:Sentence)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
    w in reduce(s=[], neighbour IN neighbours | 
    case when size(s) = 0 then
    neighbour else [
        i in range(0, size(s)-1) |
        s[i] + neighbour[i]] end) |
        w / tofloat(size(neighbours))
    ] as average
 
 with sentence, [
     i in range(0, size(sentence.embedding)-1) |
     (0.8 * sentence.embedding[i]) + (0.2 *average[i])
 ] as unnormalized
 
 with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
 set sentence.normalized = [
     i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
 ]

对于并行化,apoc 是您的朋友,特别是 apoc.periodic.iterate 过程。在您的用例中,您可以并行化,因为您只更新每行中单个节点的 属性。

结果查询类似于:

CALL apoc.periodic.iterate("
match (s:Sentence) RETURN s",
"
match (s)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
    w in reduce(s=[], neighbour IN neighbours | 
    case when size(s) = 0 then
    neighbour else [
        i in range(0, size(s)-1) |
        s[i] + neighbour[i]] end) |
        w / tofloat(size(neighbours))
    ] as average
 
 with sentence, [
     i in range(0, size(sentence.embedding)-1) |
     (0.8 * sentence.embedding[i]) + (0.2 *average[i])
 ] as unnormalized
 
 with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
 set sentence.normalized = [
     i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
 ]", {batchSize:1000, parallel:true})

您可以尝试使用 batchSize 参数。有关详细信息,请查看 docs.