两个 Byte[Array] 的高效按位或

Efficient bitwise OR of two Byte[Array]

我需要在 Spark 中对 2 个非常大 (>1GB) 的 ByteArray 进行比特运算(因此使用 Scala)。

我正在寻找最有效的方法(在速度和内存方面),这意味着我不想使用 'zip' 之类的方法将我的数组转换为列表。

目前,我正在使用以下方法,但我想知道你们中的一些人是否有其他想法...

def bitor(x: Array[Byte], y: Array[Byte]) : Array[Byte] = {
  for(i <- 0 to x.size) {
    x(i) = (x(i) | y(i)).toByte
  }
  return x
}

我应该通过 JNI 并在本机 C 中计算它吗?

我假设您的代码是 运行 在分布式环境中。如果那样,我认为最好的选择是使用 parallel collections API

并行集合使用您机器的多核硬件执行任务,并为开发人员提供简单和 t运行sparent 工作。

我认为这种方法的主要优点是,如果您向云服务添加更多硬件,您的代码将准备就绪,您无需更改任何内容。

我 运行 对您的代码与并行实现进行了一些测试。 请注意,我在我的个人计算机上使用 Scala REPL 进行了这些测试

import scala.collection.parallel.mutable.ParArray
import scala.util.Random

// prepare arrays
val rnd = Random

// parallel arrays
val pArr1 = ParArray.tabulate(20000)(x => rnd.nextInt(100).toByte)
val pArr2 = ParArray.tabulate(20000)(x => rnd.nextInt(100).toByte)

// common arrays
val arr1 = pArr1.toArray
val arr2 = pArr2.toArray

println(pArr1)
println(pArr2)

println(arr1)
println(arr2)

println("Variables loaded")

// define parallel task
def parallel(arr1: ParArray[Byte], arr2: ParArray[Byte]): Unit = {
  val start = System.currentTimeMillis
  val r = (arr1 zip arr2).map(x => x._1 | x._2)
  //println(r)
  println(s"Execution time: ${System.currentTimeMillis - start}")
}

// define single thread task
def bitor(x: Array[Byte], y: Array[Byte]): Unit = {
  val start = System.currentTimeMillis
  for (i <- 0 until x.size) {
    x(i) = (x(i) | y(i)).toByte
  }
  //x.foreach(println)
  println(s"Execution time: ${System.currentTimeMillis - start}")
  //  return x
}

println("functions defined")

我正在生成 20 000 个 运行1 到 100 之间的 dom 数字并将它们转换为字节。

之后每个方法我执行了20次(并行和单线程),执行情况如下:

> (1 to 20).foreach(x => parallel(pArr1, pArr2))


// parallel method (in milliseconds)
1)  Execution time: 10
2)  Execution time: 3
3)  Execution time: 6
4)  Execution time: 4
5)  Execution time: 29
6)  Execution time: 4
7)  Execution time: 4
8)  Execution time: 3
9)  Execution time: 3
10) Execution time: 6
11) Execution time: 1
12) Execution time: 2
13) Execution time: 1
14) Execution time: 1
15) Execution time: 4
16) Execution time: 1
17) Execution time: 1
18) Execution time: 2
19) Execution time: 1
20) Execution time: 1

Avg(11 to 20) = 1.5 milliseconds

//---------------------------------------- ----------------------

(1 to 20).foreach(x => bitor(arr1, arr2))

// bitor method (in milliseconds)
1)  Execution time: 1
2)  Execution time: 0
3)  Execution time: 0
4)  Execution time: 1
5)  Execution time: 0
6)  Execution time: 0
7)  Execution time: 1
8)  Execution time: 0
9)  Execution time: 0
10) Execution time: 3
11) Execution time: 0
12) Execution time: 0
13) Execution time: 0
14) Execution time: 0
15) Execution time: 2
16) Execution time: 0
17) Execution time: 3
18) Execution time: 0
19) Execution time: 1
20) Execution time: 0

Avg(11 to 20) = 0.6 milliseconds

由于 JIT 编译器准备,我放弃了前十次执行。 See more here

如你所见bitor方法比parallel快一点,我不确定parallel方法是否会针对ParallelAPI优化了更好的解决方案,但我认为在分布式云环境中parallel方法应该比bitor更快。

您使用 foreach 的代码脱糖成等效的 java 代码:

public final class _$$anon$$anonfun$bitor extends AbstractFunction1$mcVI$sp implements Serializable {
    private final byte[] x;
    private final byte[] y;

    public _$$anon$$anonfun$bitor(byte[] x, byte[] y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public final void apply(final int i) {
        this.apply$mcVI$sp(i);
    }

    @Override
    public void apply$mcVI$sp(final int i) {
        this.x[i] |= this.y[i];
    }
}


private byte[] bitor(final byte[] x, final byte[] y) {
    RichInt.to$extension0(Predef.intWrapper(0), Predef.byteArrayOps(x).size())
            .foreach(new _$$anon$$anonfun$bitor(x, y));
    return x;
}

但是,如果将 for 理解替换为 while,情况就会发生变化:

def bitor(x: Array[Byte], y: Array[Byte]) : Array[Byte] = {
  var i = 0
  while (i < x.length) {
    x(i) = (x(i) | y(i)).toByte
    i += 1
  }

  x
}

被转译为:

private byte[] bitor(final byte[] x, final byte[] y) {
    for (int i = 0; i < x.length; ++i) {
        x[i] |= y[i];
    }
    return x;
}