如何有效地将入站 netty.io ByteBuf 消息分发到 ChannelGroup?

How to efficiently distribute inbound netty.io ByteBuf messages to a ChannelGroup?

我创建了一个 netty.io BootStrap 从旧服务器接收流数据。服务器使用 ISO-8859-1 字符集发送数据。还有一个使用不同分隔符字节的内部 "protocol":

private static final byte GS = 29;  // FS ASCII char 1D (group separator)
private static final byte RS = 30;  // FS ASCII char 1E (record separator)
private static final byte US = 31;  // FS ASCII char 1F (unit separator)

private static final ByteProcessor GROUP_SEPARATOR_LOCATOR = value -> value != GS;
private static final ByteProcessor RECORD_SEPARATOR_LOCATOR = value -> value != RS;
private static final ByteProcessor UNIT_SEPARATOR_LOCATOR = value -> value != US;

这些 ByteProcessor 实例用于拆分消息。每条消息最终都被翻译成相应的对象表示,而 keyValueMapping 包含原始消息的主要内容:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, byte[]> keyValueMapping;
    // SOME OTHER STUFF
}

随后,所有更新都转发到所有连接的网络套接字客户端,这些客户端由单独的服务器处理BootStrap:

public void distribute(ChannelGroup recipients, Object msg) {
    Update updateMsg = (Update) msg;
    recipients.writeAndFlush(updateMsg);
}

当我激活 Java 飞行记录并执行一些负载测试时,我意识到主要的分配热点是将初始入站消息的值转换为 ISO-8859-1 字节数组的方法:

private byte[] translateValue(ByteBuf in) {
    byte [] result;

    if (!in.hasArray()) {
        result = new byte[in.readableBytes()];
        in.getBytes(in.readerIndex(), result);
    } else {
        result = in.array();
    }
    return result;
}

最初我没有翻译ByteBuf,而是直接将它们存储在Update 的keyValueMapping 映射中。由于 ByteBuf 对象维护了一些不受保护的内部索引(reader、writer、marker 等)——根据设计,我害怕简单地将这些 ByteBuf 包装并转发到不同的通道(参见上面的收件人 channelGroup)并决定改为使用此 byte[] 表示。

检查 Java 飞行记录结果,我想知道是否有任何建议如何将未更改的入站数据分发到一组不同的通道而不会过多地限制 GC? 从结果中学习,直接缓冲区用于给定的通道,因为创建了许多新的字节数组。

为了提供更多上下文,我还添加了执行剩余消息翻译的代码:

while (in.readableBytes() > 0) {
    ByteBuf keyAsByteBuf = nextToken(in, UNIT_SEPARATOR_LOCATOR);
    String key = translateKey(keyAsByteBuf);

    if (key != null) {
        ByteBuf valueAsByteBuf = nextToken(in, RECORD_SEPARATOR_LOCATOR);
        byte[] value = translateValue(valueAsByteBuf);

        if (value.length > 0) {
            mapping.put(key, value); 
        }
    }
}

private ByteBuf nextToken(ByteBuf in, ByteProcessor locator) {
    int separatorIdx = in.forEachByte(in.readerIndex(), in.readableBytes(), locator);

    if (separatorIdx >= 0) {
        ByteBuf token = in.readSlice(separatorIdx - in.readerIndex());
        in.skipBytes(1);
        return token;
    }
    return in.readSlice(in.readableBytes());
}

private String translateKey(ByteBuf in) {
    return keyTranslator.translate(in);
}

嗯... 其实你的问题没那么简单。我会尽量简短地回答。

如果您的应用不需要将 ByteBuf 翻译成 byte[]。所以我假设你有下一个结构:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, ByteBuf> keyValueMapping;
}

这里的问题是您部分解析了 ByteBuf。所以在这个 java 对象中有 java 个对象 + ByteBuf

完全没问题,您可以进一步操作那些 ByteBuf's。您的 class Update 应该实现 ReferenceCounted 接口。因此,当您执行 recipients.writeAndFlush(updateMsg)(假设收件人是 DefaultChannelGroup)时,netty DefaultChannelGroup 将处理对这些缓冲区的引用。

那么会发生什么:

recipients.writeAndFlush(updateMsg) 之后,DefaultChannelGroup 循环将您的 updateMsg 发送到列表中带有 channel.writeAndFlush(safeDuplicate(message)) 的每个频道。 safeDuplicate 是处理对 netty ByteBuf 的引用的特殊方法,因此您可以将相同的缓冲区发送到多个接收器(它实际上使用 retainedDuplicate() 复制缓冲区)。但是,你的对象不是ByteBuf,而是java对象。这是该方法的代码:

private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).retainedDuplicate();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).retainedDuplicate();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}

因此,为了正确处理 ByteBuf 的引用,您需要为 ReferenceCountUtil.retain(message) 实施 ReferenceCounted。类似的东西:

public class Update implements ReferenceCounted {
    @Override
    public final Update retain() {
        return new Update(id, updateType, makeRetainedBuffers());
    }  

    private Map makeRetainedBuffers() {
       Map newMap = new HashMap();
       for (Entry entry : keyValueMapping) {
           newMap.put(entry.key, entry.value.duplicate().retain())
       }
       return newMap;
    }
}

这只是一个伪代码。但你应该明白了。您还必须在 Update class 中实现 release() 方法,并确保它始终释放它持有的缓冲区。并释放里面的所有缓冲区。我假设您已经在管道中为 Update class 调用 release() 的编码器。

另一种选择是实现自己的 DefaultChannelGroup。在这种情况下,您不必依赖 safeDuplicate 方法。因此您不需要实现 ReferenceCounted,但是您仍然需要在 class.

中手动处理保留和释放