每次将大约固定大小的字节数组发送到另一种方法

Sending byte array of approximately fixed size everytime to another method

我有一个方法接受一个 Partition 枚举参数。通过传递 partition 的不同值,此方法将在同一时间段内由多个后台线程(最多 15 个)调用。这里 dataHoldersByPartitionPartitionConcurrentLinkedQueue<DataHolder>ImmutableMap

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition` map

  private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      
    }
  }

下面是我的 Message class:

public final class Message {
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  // Output of this method should always be less than 50k always
  public byte[] serialize() {
    // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout
    byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
        .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
        .put(recordsPartition).put(replicated);

    // data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

      byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
          .put(value);
    }
    return byteBuffer.array();
  }

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }

    // getters and to string method here
}

基本上,我必须确保在 validateAndSend 方法中调用 sendToDatabase 方法时,message.serialize() 字节数组的大小应始终小于 50k .我的 sendToDatabase 方法发送来自 serialize 方法的字节数组。因此,例如,如果我在 dataHolders CLQ 中有 60k 条记录,那么我将在 validateAndSend 方法中发送两个块:

为了完成上述事情,我在 validateAndSend 方法中使用了 totalSize 变量,该方法试图测量 50k 大小,但看起来我的方法可能不正确,我可能会删除一些记录或我猜每次发送超过 50k?

看起来我的 Message class 知道 clientKeyBytesAndProcessBytesHolder 地图,我可以使用这个地图通过调用 getBufferCapacity 方法来准确定义大小,如果大约小于超过 50k 然后调用 sendToDatabase 方法?

所以这是我的尝试(这个问题可能最好向 Code Review 社区提出,但无论如何)。它依赖于对 Message 的一些设计更改,因此它变得更像 Builder 模式。缓冲区成为消息的一部分。它的占用是通过对 BufferOverflowException 异常做出反应来控制的。一旦发生,缓冲区将回滚到上次成功添加的结果,分配新消息,并重试添加同一块数据。缓冲区完成后,记录总数和总大小被写入 header,整个缓冲区被转储到字节数组(我可能会尽量避免这种额外的转换并直接在缓冲区上操作 sendToDatabase,但现在超出范围):

// TODO: structure has been adjusted for testing purposes
enum Partition
{
    A(0x1);

    private final int _partition;

    int getPartition()
    {
        return _partition;
    }

    Partition(final int partition)
    {
        _partition = partition;
    }
}

// TODO: structure has been adjusted for testing purposes
final static class DataHolder
{
    private final String _clientKey;
    private final byte[] _processBytes;

    public DataHolder(
        final String clientKey,
        final String value)
    {
        _clientKey = clientKey;
        byte[] valueBytes = value.getBytes();
        // simulate payload including extra bytes for the header
        final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length)
                                            .order(ByteOrder.BIG_ENDIAN);
        buffer.putInt(0).putLong(System.currentTimeMillis()).put(valueBytes);
        _processBytes = readToBytes(buffer);
    }

    String getClientKey()
    {
        return _clientKey;
    }

    byte[] getProcessBytes()
    {
        return _processBytes;
    }
}

// API has been changed to something more like the Builder pattern
final static class Message
{
    private final long address;
    private final long addressFrom;
    private final long addressOrigin;
    private final byte recordsPartition;
    private final byte replicated;
    private final ByteBuffer buffer;
    private final int writeStatsPosition;
    private int payloadCount;

    Message(Partition recordPartition, int sizeLimit)
    {
        this.recordsPartition = (byte) recordPartition.getPartition();
        this.replicated = 0;
        // TODO: temporarily replaced with a hard-coded constant
        long packedAddress = 123456789L;
        this.address = packedAddress;
        this.addressFrom = 0L;
        this.addressOrigin = packedAddress;
        buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN);
        // TODO: temporarily replaced with a hard-coded constant
        byte dataCenter = 0x1;
        byte recordVersion = 1;
        buffer.put(dataCenter).put(recordVersion);
        writeStatsPosition = buffer.position();
        buffer.putInt(datacenter).putInt(recordVersion);
        buffer.putLong(address).putLong(addressFrom).putLong(addressOrigin)
                  .put(recordsPartition).put(replicated);
    }

    /**
     * Tries to add another pair of client key and process bytes to
     * the current message. Returns true if successfully added, false -
     * if the data cannot be accommodated due to message binary size limit.
     */
    boolean add(byte[] key, byte[] value)
    {
        try
        {
            byte keyType = 0;
            byte keyLength = (byte) key.length;
            short valueLength = (short) value.length;
            ByteBuffer valueAsBuffer = ByteBuffer.wrap(value);
            long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis();
            payloadCount++;
            // remember position in the buffer to roll back to in case of overflow
            buffer.mark();
            buffer.put(keyType).put(keyLength).put(key);
            buffer.putLong(timestamp).putShort(valueLength).put(value);

            return true;
        }
        catch (BufferOverflowException e)
        {
            payloadCount--;
            buffer.reset();
            return false;
        }
    }

    byte[] serialize()
    {
        int finalPosition = buffer.position();
        // adjust the message header with the totals
        buffer.putInt(writeStatsPosition, payloadCount)
              .putInt(writeStatsPosition + 4, finalPosition);
        return readToBytes(buffer);
    }
}

static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory)
    throws InterruptedException
{
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Message message = messageFactory.get();
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null)
    {
        final byte[] keyBytes = dataHolder.getClientKey()
                                    .getBytes(StandardCharsets.UTF_8);
        final int keyLength = keyBytes.length;
        if (keyLength > 255)
        {
            continue;
        }

        while (!message.add(keyBytes, dataHolder.getProcessBytes()))
        {
            // TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair
            Preconditions.checkState(message.payloadCount > 0,
                "buffer size too small to accommodate payload");
            final byte[] serializedMessage = message.serialize();
            // TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing
            sendToDatabase(message.address, serializedMessage);
            message = messageFactory.get();
        }
    }
    if (message.payloadCount > 0)
    {
        byte[] serializedMessage = message.serialize();
        sendToDatabase(message.address, serializedMessage);
    }
}

static void sendToDatabase(long address, byte[] serializedMessage)
{
    // TODO: added simulating activity
    System.out.printf("Sending %d bytes to %d: %s%n",
        serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage));
}

static byte[] readToBytes(ByteBuffer buffer)
{
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    return bytes;
}

public static void main(String[] args)
    throws ExecutionException, InterruptedException
{
    // TODO: using small value for testing - must be set to 50K in real case
    final int maxMessageSize = 80;
    final Supplier<Message> messageFactory = new Supplier<Message>()
    {
        @Override
        public Message get()
        {
            return new Message(Partition.A, maxMessageSize);
        }
    };

    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A);
    dataHolders.add(new DataHolder("0000000001", "alpha"));
    dataHolders.add(new DataHolder("0000000002", "bravo"));
    dataHolders.add(new DataHolder("0000000003", "charlie"));
    dataHolders.add(new DataHolder("0000000004", "delta"));
    dataHolders.add(new DataHolder("0000000005", "echo"));
    dataHolders.add(new DataHolder("0000000006", "foxtrot"));

    validateAndSend(Partition.A, messageFactory);
}

您可能会通过对职责进行排序来获得更清晰的代码。 目前,Message class 负责将 DataHolder 项目转换为序列化形式。但也有望确保满足尺寸限制。不幸的是,调用方法在不知道 Message class.

的大小要求的情况下检查大小期望

我建议负责向 Message class 发送适当的垃圾数据,从而将 "knowledge about the proper data junk formatting" 移至 Message class 本身.

您可能还注意到,当前的实现是考虑每个项目的完整 header 大小,而每个 serialize()

只添加一次 header

请在下面找到建议改进的草图。代码需要进一步完善。但它主要用于说明结构和 readability/maintainability.

的基本改进

为了将 sendToDatabase() 功能与 Message class 隔离,我只是添加了一个简单的接口:

// decoupling the sending logic from the formatting
// if external requirements suggest linking such functionality into the message class
// such interface would be unnecessary
public interface DatabaseDelivery {
    void sendToDatabase(long addres, byte[] messagePayload);
}

消息 class 已更改为处理垃圾邮件和大小限制。现在是 Closeable,表示您最后应该调用 close()。 (因此您可能会考虑对 java 的当前版本使用适当的结构)

public final class Message implements Closeable {
    // or initialize it from some external source if this might change dynamically
    private static final int MAX_SIZE = 50000;
    // better determine this in sync with addHeader() method
    private static final int HEADER_SIZE = 36;

    private final byte dataCenter;
    private final byte recordVersion;
    private final long address;
    private final long addressFrom;
    private final long addressOrigin;
    private final byte recordsPartition;
    private final byte replicated;
    private final DatabaseDelivery delivery;
    private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
    private int pendingItems = 0;

    public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) {
        this.recordsPartition = (byte) recordPartition.getPartition();
        this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
        this.recordVersion = 1;
        this.replicated = 0;
        final long packedAddress = new Data().packAddress();
        this.address = packedAddress;
        this.addressFrom = 0L;
        this.addressOrigin = packedAddress;
        this.delivery = databaseDelivery;
    }

    private void addHeader(final ByteBuffer buffer, final int items) {
        buffer.put(dataCenter)
              .put(recordVersion)
              .putInt(items)
              .putInt(buffer.capacity())
              .putLong(address)
              .putLong(addressFrom)
              .putLong(addressOrigin)
              .put(recordsPartition)
              .put(replicated);
    }

    private void sendData() {
        if (itemBuffer.position() == 0) {
            // no data to be sent
            //Properties: itemBuffer serialized size == 0
            return;
        }
        final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
        addHeader(buffer, pendingItems);
        itembuffer.flip();
        buffer.put(itemBuffer);
        delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position());
        itemBuffer.clear();
        pendingItems = 0;
        //Properties: itemBuffer serialized size == 0                
    }

    public void addAndSendJunked(final byte[] key, final byte[] data) {
        if (key.length > 255) {
            return;
        }
        if (data.length > 255) {
            return;
        }
        final byte keyLength = (byte) key.length;
        final byte dataLength = (byte) data.length;

        final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
        final int newSize = itemBuffer.position() + additionalSize;
        //Properties: itemBuffer serialized size < MAX
        if (newSize >= (MAX_SIZE-HEADER_SIZE)) {
            sendData();
        }
        if (additionalSize > (MAX_SIZE-HEADER_SIZE)) {
            //XXX Use exception that is appropriate for your application
            //XXX You might add sizes involved for ease of analysis
            throw new AppConfigurationException("Size of single item exceeds maximum size");
        }
        //Properties: itemBuffer size (old+new or new) < MAX 

        final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
        final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
        // data layout
        itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data);
        pendingItems++ ;

//属性:itemBuffer size < MAX }

    @Override
    public void close() {
        if (pendingItems > 0) {
            sendData();
        }
    }

最后您的调用代码将变为:

private void validateAndSend(final Partition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    // the instance providing sendToDatabase() method
    // just for cutting off details external to the discussion
    final DatabaseDelivery delivery = this;
    final Message message = new Message(partition, this);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
        // XXX: why is client key using explicit encoding while process bytes is not?
        message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes());
    }
    message.close();
}

请注意,我在可能需要注意的地方添加了一些标记(XXX)。 (然而,这些可以从所提供信息之外的信息来解释)

还有一些细节可以考虑。 例如。对于给定的用例(在大多数地方),我不相信使用 ByteBuffer 是合适的 collection。

编辑: 关于测试,由于代码量小,您可以考虑应用形式验证(至少部分)。这类似于现代编译器对静态代码分析的要求:你遍历你的代码(用纸和铅笔)并派生出在那个地方持有的属性。我在上面的代码中添加了注释(标记为 //Properties)以说明您可能会这样做。 (请注意:这是一个简单的说明,肯定需要为每个语句派生和完成更多属性)。我只是对结果缓冲区大小做了一些最小的归因。 (使用 MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, akaMAX_SIZE-HEADER_SIZE`)。

当然,人们可能会(正确地)建议为关键案例编写测试。在那种情况下,那将是白盒测试。在(已知)实现的极端情况下测试代码的正确功能。您还需要根据规范进行黑盒测试来测试代码的行为。

而且您还可以添加运行时检查以确保关键部分的正确行为。例如。在执行 sendToDatabase() 时,您可以检查最大尺寸要求。然而,这样的测试需要合适的输入来合理化正确的行为。使用通过静态分析从代码派生的属性,可以提供 well-behavior 的证据,而不会最终怀疑没有找到会导致失败的测试用例。