Neo4j:Cypher 查询以并行化先前查询的结果行
Neo4j: Cypher query to parallelize a row of the result from a previous query
我有一个数据库,其中的句子相互关联。我必须对整个数据库执行一次大更新,因此我正在尝试并行化更新。
相关的密码查询如下所示:
match (s:Sentence)-[r:RELATED]-(t:Sentence)
return s as sentence, collect(t.embedding) as neighbours_embeddings
embedding
是一个数字列表。
这 returns 结果是这样的:
---------------------------------------
| sentence | neighbours_embeddings |
---------------------------------------
| sentence1 | [[1, 2, 3], [4, 5, 6]] |
---------------------------------------
| sentence2 | [[2, 3, 5]] |
---------------------------------------
现在想对neighbours_embeddings进行一些操作,在相应的Sentence节点设置一个属性
我研究了 Neo4j 中的不同并行化技术,据我所知,它们都需要一个列表作为输入。但我的输入将是一个元组(句子,neighbours_embeddings)。我该如何实现?
感兴趣的人的完整查询:
match (s:Sentence)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours |
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average
with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized
with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]
对于并行化,apoc 是您的朋友,特别是 apoc.periodic.iterate
过程。在您的用例中,您可以并行化,因为您只更新每行中单个节点的 属性。
结果查询类似于:
CALL apoc.periodic.iterate("
match (s:Sentence) RETURN s",
"
match (s)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours |
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average
with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized
with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]", {batchSize:1000, parallel:true})
您可以尝试使用 batchSize 参数。有关详细信息,请查看 docs.
我有一个数据库,其中的句子相互关联。我必须对整个数据库执行一次大更新,因此我正在尝试并行化更新。
相关的密码查询如下所示:
match (s:Sentence)-[r:RELATED]-(t:Sentence)
return s as sentence, collect(t.embedding) as neighbours_embeddings
embedding
是一个数字列表。
这 returns 结果是这样的:
---------------------------------------
| sentence | neighbours_embeddings |
---------------------------------------
| sentence1 | [[1, 2, 3], [4, 5, 6]] |
---------------------------------------
| sentence2 | [[2, 3, 5]] |
---------------------------------------
现在想对neighbours_embeddings进行一些操作,在相应的Sentence节点设置一个属性
我研究了 Neo4j 中的不同并行化技术,据我所知,它们都需要一个列表作为输入。但我的输入将是一个元组(句子,neighbours_embeddings)。我该如何实现?
感兴趣的人的完整查询:
match (s:Sentence)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours |
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average
with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized
with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]
对于并行化,apoc 是您的朋友,特别是 apoc.periodic.iterate
过程。在您的用例中,您可以并行化,因为您只更新每行中单个节点的 属性。
结果查询类似于:
CALL apoc.periodic.iterate("
match (s:Sentence) RETURN s",
"
match (s)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours |
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average
with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized
with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]", {batchSize:1000, parallel:true})
您可以尝试使用 batchSize 参数。有关详细信息,请查看 docs.