比较数组的数组

Comparing Arrays of Arrays

所以我有两个数组,a 和 b,大小不同,包含相同长度的子数组,并且都与子数组具有相同的类型(例如 float)。

我想在数组 a 的子数组中找到 b 中子数组的所有匹配项。

现在我正在寻找一种更快或更好的方法(可能是 CUDA 或 SIMD 编码)。

目前我有类似 (F#) 的内容:

let mutable result = 0.0
for a in arrayA do:
 for b in arrayB do:
  if a = b then 
   result <- result + (a |> Array.sum)

我的数组 a 包含大约 500 万个元素,数组 b 包含大约 3000 个元素。因此我的性能相关问题。

通过将大型数组拆分为较小的数组并并行进行相等性检查,您可以节省一些比较大型数组的时间。

这个块函数直接取自F# Snippets

let chunk chunkSize (arr : _ array) = 
query {
  for idx in 0..(arr.Length - 1) do
  groupBy (idx / chunkSize) into g
  select (g |> Seq.map (fun idx -> arr.[idx]))
}

然后像这样比较数组。我选择将每个数组拆分为 4 个较小的块:

let fastArrayCompare a1 a2 = async {
let! a =
  Seq.zip (chunk 4 a1) (chunk 4 a2)
  |> Seq.map (fun (a1',a2') -> async {return a1' = a2'}) 
  |> Async.Parallel
return Array.TrueForAll (a,(fun t -> t))}

很明显,您现在添加了一些额外的数组拆分时间,但是有很多非常大的数组比较,您应该这次再补上一些。

你用暴力算法解决了这个问题。假设 AB 的大小为 NM分别地,您要检查相等性的每个小数组都是 K 个元素长。您的算法在最坏情况下需要 O(N M K) 时间,在最好情况下需要 O(N M + Z K),因为匹配数是 Z(可能达到N M)。

请注意,每个小数组本质上都是一个字符串。你有两组字符串,你想检测它们之间所有相等的对。

这个问题可以用hash table解决。使用 O(M) 个单元格创建散列 table。在这个table中,不重复地存储数组B的字符串。添加来自 B 的所有字符串后,迭代来自 A 的字符串并检查它们是否存在于散列 table 中。该解决方案可以实现为平均 O((M + N) K) 时间复杂度的随机解决方案,它与输入数据大小成线性关系。

另外,你也可以用非随机的方式来解决这个问题。将所有字符串放入一个数组 X 中并对它们进行排序。在排序期间,将来自 A 的字符串放在来自 B 的所有相等字符串之后。请注意,您应该记住 X 的哪些字符串来自哪个数组。您可以使用一些快速 comparison sort, or use radix sort。在后一种情况下,排序是在线性时间内完成的,即 O((M + N) K).

现在所有常用字符串都连续存储在X中。您可以迭代 X,维护 B 中的字符串集等于当前处理的字符串。如果您看到与前一个字符串不同的字符串,请清除该集合。如果字符串来自 B,则将其添加到集合中。如果来自A,记录它等于来自B的元素集。这是对 X 的单次传递,每个字符串的时间为 O(K),因此需要 O((M + N ) K) 共计时间

如果您的字符串长度K 不是很小,您可以将矢量化添加到字符串操作中。在哈希 table 方法的情况下,大部分时间将花在计算字符串哈希上。如果选择 polynomial hash modulo 2^32,那么很容易用 SSE2 对其进行矢量化。此外,您需要快速的字符串比较,这可以使用 memcmp 函数来完成,它也可以很容易地矢量化。对于排序解决方案,您只需要进行字符串比较。此外,您可能想要实现基数排序,恐怕无法进行矢量化。

两种方法的高效并行化并不是很简单。对于第一个算法,你需要一个并发散列table。实际上,那里甚至还有一些 lock-free hash tables。对于第二种方法,您可以将第一步并行化(快速排序很容易并行化,基数排序则不然)。如果没有太多相等的字符串,第二步也可以并行化:您可以将数组 X 分成几乎相等的部分,只在两个不同的字符串之间分解。