使用 ByteBuffer 以一种有效的方式将 header 和数据布局打包到一个字节数组中?

Pack header and data layout in one byte array using ByteBuffer in an efficient way?

我有一个 header 和我需要在一个 Byte Array 中表示的数据。我有一种特殊的格式可以将 header 打包到 Byte Array 中,还有一种不同的格式可以将数据打包到 Byte Array 中。在我有了这两个之后,我需要用它做一个最终的Byte Array

下面是 C++ 中定义的布局,因此我必须在 Java 中做。

// below is my header offsets layout

// addressedCenter must be the first byte
static constexpr uint32_t  addressedCenter      = 0;
static constexpr uint32_t  version              = addressedCenter + 1;
static constexpr uint32_t  numberOfRecords      = version + 1;
static constexpr uint32_t  bufferUsed           = numberOfRecords + sizeof(uint32_t);
static constexpr uint32_t  location             = bufferUsed + sizeof(uint32_t);
static constexpr uint32_t  locationFrom         = location + sizeof(CustomerAddress);
static constexpr uint32_t  locationOrigin       = locationFrom + sizeof(CustomerAddress);
static constexpr uint32_t  partition            = locationOrigin + sizeof(CustomerAddress);
static constexpr uint32_t  copy                 = partition + 1;

// this is the full size of the header
static constexpr uint32_t headerOffset = copy + 1;

CustomerAddressuint64_t 的类型定义,它是这样组成的 -

typedef uint64_t   CustomerAddress;

void client_data(uint8_t datacenter, 
                 uint16_t clientId, 
                 uint8_t dataId, 
                 uint32_t dataCounter,
                 CustomerAddress& customerAddress)
{
    customerAddress = (uint64_t(datacenter) << 56)
                    + (uint64_t(clientId) << 40)
                    + (uint64_t(dataId) << 32)
                    + dataCounter;
}

下面是我的数据布局 -

// below is my data layout -
//
// key type - 1 byte
// key len - 1 byte
// key (variable size = key_len)
// timestamp (sizeof uint64_t)
// data size (sizeof uint16_t)
// data (variable size = data size)

问题陈述:-

现在,对于项目的一部分,我试图在 Java 中的一个特定 class 中表示整体内容,这样我就可以传递必要的字段,这可以使我成为最终的 Byte Array 出来,首先是 header 然后是数据:

下面是我的DataFrameclass:

public final class DataFrame {
  private final byte addressedCenter;
  private final byte version;
  private final Map<byte[], byte[]> keyDataHolder;
  private final long location;
  private final long locationFrom;
  private final long locationOrigin;
  private final byte partition;
  private final byte copy;

  public DataFrame(byte addressedCenter, byte version,
      Map<byte[], byte[]> keyDataHolder, long location, long locationFrom,
      long locationOrigin, byte partition, byte copy) {
    this.addressedCenter = addressedCenter;
    this.version = version;
    this.keyDataHolder = keyDataHolder;
    this.location = location;
    this.locationFrom = locationFrom;
    this.locationOrigin = locationOrigin;
    this.partition = partition;
    this.copy = copy;
  }

  public byte[] serialize() {
    // All of the data is embedded in a binary array with fixed maximum size 70000
    ByteBuffer byteBuffer = ByteBuffer.allocate(70000);
    byteBuffer.order(ByteOrder.BIG_ENDIAN);

    int numOfRecords = keyDataHolder.size();
    int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

    // header layout
    byteBuffer.put(addressedCenter); // byte
    byteBuffer.put(version); // byte
    byteBuffer.putInt(numOfRecords); // int
    byteBuffer.putInt(bufferUsed); // int
    byteBuffer.putLong(location); // long
    byteBuffer.putLong(locationFrom); // long
    byteBuffer.putLong(locationOrigin); // long
    byteBuffer.put(partition); // byte
    byteBuffer.put(copy); // byte

    // now the data layout
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
      byte keyType = 0;
      byte keyLength = (byte) entry.getKey().length;
      byte[] key = entry.getKey();
      byte[] data = entry.getValue();
      short dataSize = (short) data.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(data);
      long timestamp = 0;

      if (dataSize > 10) {
        timestamp = dataBuffer.getLong(2);              
      }       

      byteBuffer.put(keyType);
      byteBuffer.put(keyLength);
      byteBuffer.put(key);
      byteBuffer.putLong(timestamp);
      byteBuffer.putShort(dataSize);
      byteBuffer.put(data);
    }
    return byteBuffer.array();
  }

  private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) {
    int size = 36;
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }  
}

下面是我如何使用上面的 DataFrame class:

  public static void main(String[] args) throws IOException {
    // header layout
    byte addressedCenter = 0;
    byte version = 1;

    long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120);
    long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130);
    long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140);

    byte partition = 3;
    byte copy = 0;

    // this map will have key as the actual key and value as the actual data, both in byte array
    // for now I am storing only two entries in this map
    Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>();
    for (int i = 1; i <= 2; i++) {
      keyDataHolder.put(generateKey(), getMyData());
    }

    DataFrame records =
        new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom,
            locationOrigin, partition, copy);

    // this will give me final packed byte array
    // which will have header and data in it.
    byte[] packedArray = records.serialize();
  }

  private static long packCustomerAddress(byte datacenter, short clientId, byte dataId,
      int dataCounter) {
    return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32)
        | ((long) dataCounter);
  }   

如您在我的 DataFrame class 中所见,我正在分配 ByteBuffer 预定义大小 70000。有没有更好的方法可以分配我在制作 ByteBuffer 时使用的大小,而不是使用硬编码的 70000?

与我正在做的将我的 header 和数据打包在一个字节数组中相比,还有什么更好的方法吗?我还需要确保它是线程安全的,因为它可以被多个线程调用。

另一种方法是通过 DataOutputStream 围绕 ByteArrayOutputStream,但您应该将性能调整集中在需要的地方,这不是其中之一。效率在这里不是任何问题。网络 I/O 将占主导地位。

使用 ByteArrayOutputStream 的另一个原因是您不必提前猜测缓冲区大小:它会根据需要增长。

要保留它 thread-safe,仅使用局部变量。

Is there a better way by which I can allocate the size I am using while making ByteBuffer instead of using a hardcoded 70000?

至少有两种 non-overlapping 方法。您可以同时使用两者。

一个是缓冲池。您应该找出在高峰期需要多少缓冲区,并使用高于它的最大值,例如max + max / 2, max + average, max + mode, 2 * max.

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;

public class ByteBufferPool {
    private final int bufferCapacity;
    private final LinkedBlockingDeque<ByteBuffer> queue;

    public ByteBufferPool(int limit, int bufferCapacity) {
        if (limit < 0) throw new IllegalArgumentException("limit must not be negative.");
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.bufferCapacity = bufferCapacity;
        this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit);
    }

    public ByteBuffer acquire() {
        ByteBuffer buffer = (queue == null) ? null : queue.pollFirst();
        if (buffer == null) {
            buffer = ByteBuffer.allocate(bufferCapacity);
        }
        else {
            buffer.clear();
            buffer.order(ByteOrder.BIG_ENDIAN);
        }
        return buffer;
    }

    public boolean release(ByteBuffer buffer) {
        if (buffer == null) throw new IllegalArgumentException("buffer must not be null.");
        if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity.");
        if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct.");
        if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only.");

        return (queue == null) ? false : queue.offerFirst(buffer);
    }

    public void withBuffer(Consumer<ByteBuffer> action) {
        if (action == null) throw new IllegalArgumentException("action must not be null.");

        ByteBuffer buffer = acquire();
        try {
            action.accept(buffer);
        }
        finally {
            release(buffer);
        }
    }

    public <T> T withBuffer(Function<ByteBuffer, T> function) {
        if (function == null) throw new IllegalArgumentException("function must not be null.");

        ByteBuffer buffer = acquire();
        try {
            return function.apply(buffer);
        }
        finally {
            release(buffer);
        }
    }

    public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) {
        if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null.");

        ByteBuffer buffer = acquire();
        CompletionStage<T> future = null;
        try {
            future = asyncFunction.apply(buffer);
        }
        finally {
            if (future == null) {
                release(buffer);
            }
            else {
                future = future.whenComplete((result, throwable) -> release(buffer));
            }
        }
        return future;
    }
}

withBuffer 方法允许直接使用池,而 acquirerelease 允许分离获取点和释放点。

另一个是分离序列化接口,例如putputIntputLong,然后您可以在其中实现字节计数 class 和实际字节缓冲 class。您应该向此类接口添加一个方法以了解序列化程序是在计算字节数还是缓冲,以避免不必要的字节生成,以及另一种直接增加字节使用量的方法,这在计算某些编码中的字符串大小时而不实际序列化时很有用.

public interface ByteSerializer {
    ByteSerializer put(byte value);

    ByteSerializer putInt(int value);

    ByteSerializer putLong(long value);

    boolean isSerializing();

    ByteSerializer add(int bytes);

    int position();
}

public class ByteCountSerializer implements ByteSerializer {
    private int count = 0;

    @Override
    public ByteSerializer put(byte value) {
        count += 1;
        return this;
    }

    @Override
    public ByteSerializer putInt(int value) {
        count += 4;
        return this;
    }

    @Override
    public ByteSerializer putLong(long value) {
        count += 8;
        return this;
    }

    @Override
    public boolean isSerializing() {
        return false;
    }

    @Override
    public ByteSerializer add(int bytes) {
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        count += bytes;
        return this;
    }

    @Override
    public int position() {
        return count;
    }
}

import java.nio.ByteBuffer;

public class ByteBufferSerializer implements ByteSerializer {
    private final ByteBuffer buffer;

    public ByteBufferSerializer(int bufferCapacity) {
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.buffer = ByteBuffer.allocate(bufferCapacity);
    }

    @Override
    public ByteSerializer put(byte value) {
        buffer.put(value);
        return this;
    }

    @Override
    public ByteSerializer putInt(int value) {
        buffer.putInt(value);
        return this;
    }

    @Override
    public ByteSerializer putLong(long value) {
        buffer.putLong(value);
        return this;
    }

    @Override
    public boolean isSerializing() {
        return true;
    }

    @Override
    public ByteSerializer add(int bytes) {
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        for (int b = 0; b < bytes; b++) {
            buffer.put((byte)0);
        }
        return this;
        // or throw new UnsupportedOperationException();
    }

    @Override
    public int position() {
        return buffer.position();
    }

    public ByteBuffer buffer() {
        return buffer;
    }
}

在你的代码中,你会按照这些行做一些事情(未测试):

ByteCountSerializer counter = new ByteCountSerializer();
dataFrame.serialize(counter);
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position());
dataFrame.serialize(serializer);
ByteBuffer buffer = serializer.buffer();
// ... write buffer, ?, profit ...

您的 DataFrame.serialize 方法应该重构为接受 ByteSerializer,并且在它会生成数据的情况下,它应该检查 isSerializing 以了解它是否应该只计算大小或者实际写入字节。

我把这两种方法结合起来作为练习,主要是因为这在很大程度上取决于你决定如何去做。

例如,您可以让 ByteBufferSerializer 直接使用池并保留任意容量(例如您的 70000),您可以按容量池 ByteBuffers(但不是所需的容量,使用大于所需容量的 2 的最小幂,并在 returning from acquire) 之前设置缓冲区的限制,或者你可以直接池化 ByteBufferSerializers 只要你添加一个 reset()方法。

Also is there any better way as compared to what I am doing which packs my header and data in one byte array?

是的。绕过字节缓冲实例而不是使用某些方法 return 字节数组,这些字节数组在检查长度或复制内容后立即被丢弃。

I also need to make sure it is thread safe since it can be called by multiple threads.

只要每个缓冲区仅由一个线程使用,并进行适当的同步,您就不必担心。

正确的同步意味着您的池管理器在其方法中具有获取和释放语义,并且如果多个线程在从中获取缓冲区和 return将其放入池之间使用缓冲区,则您正在添加释放在停止使用缓冲区的线程中添加语义,并在开始使用缓冲区的线程中添加获取语义。例如,如果您通过 CompletableFuture 传递缓冲区,则不必担心这一点,或者如果您在线程之间使用 Exchanger 或 [ 的正确实现进行显式通信=33=].

来自java.util.concurrent的包裹描述:

The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular:

  • Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.

  • Actions in a thread prior to the submission of a Runnable to an Executor happen-before its execution begins. Similarly for Callables submitted to an ExecutorService.

  • Actions taken by the asynchronous computation represented by a Future happen-before actions subsequent to the retrieval of the result via Future.get() in another thread.

  • Actions prior to "releasing" synchronizer methods such as Lock.unlock, Semaphore.release, and CountDownLatch.countDown happen-before actions subsequent to a successful "acquiring" method such as Lock.lock, Semaphore.acquire, Condition.await, and CountDownLatch.await on the same synchronizer object in another thread.

  • For each pair of threads that successfully exchange objects via an Exchanger, actions prior to the exchange() in each thread happen-before those subsequent to the corresponding exchange() in another thread.

  • Actions prior to calling CyclicBarrier.await and Phaser.awaitAdvance (as well as its variants) happen-before actions performed by the barrier action, and actions performed by the barrier action happen-before actions subsequent to a successful return from the corresponding await in other threads.