我在哪里可以找到 ArrayListSerde?

Where do I find ArrayListSerde?

我需要一个用于 ArrayList 的 Serde,在网络上搜索 ArrayListSerde 找到了对这种东西的引用,但我在我使用的 Kafka Streams 版本的文档或库中都找不到它使用。我在哪里可以找到它?

KStream 库没有提供 ArrayListSerde 的官方实现。您需要使用 Serializer 和 Deserializer 接口实现自定义 Serde。

https://kafka.apache.org/20/documentation/streams/developer-guide/datatypes.html#implementing-custom-serdes

下面也提到了post:

我正在尝试这样的事情。看起来它在做明智的事情,虽然我还不确定我用 null 参数做正确的事情 serialize (并且还不知道为什么它被调用 null).

public class ArrayListSerde<T> implements Serde<ArrayList<T>> {

    private final Serializer  <T> innerSerialiser;
    private final Deserializer<T> innerDeserialiser;

    public ArrayListSerde(Serde<T> inner) {
        innerSerialiser   = inner.serializer ();
        innerDeserialiser = inner.deserializer();
    }

    @Override
    public Serializer<ArrayList<T>> serializer() {
        return new Serializer<ArrayList<T>>() {
            @Override
            public byte[] serialize(String topic, ArrayList<T> data) {
                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                if (data != null ) {
                    final int size = data.size();
                    final DataOutputStream dos = new DataOutputStream(baos);
                    final Iterator<T> iterator = data.iterator();
                    try {
                        dos.writeInt(size);
                        while (iterator.hasNext()) {
                            final byte[] bytes = innerSerialiser.serialize(topic, iterator.next());
                            dos.writeInt(bytes.length);
                            dos.write(bytes);
                        }
                    } catch (IOException e) {
                        throw new RuntimeException("Unable to serialize ArrayList", e);
                    }
                }
                return baos.toByteArray();
            }
        };
    }

    @Override
    public Deserializer<ArrayList<T>> deserializer() {
        return new Deserializer<ArrayList<T>>() {
            @Override
            public ArrayList<T> deserialize(String topic, byte[] data) {
                if (data == null || data.length == 0) {
                    return null;
                }

                final ArrayList<T> arrayList = new ArrayList<>();
                final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));

                try {
                    final int records = dataInputStream.readInt();
                    for (int i = 0; i < records; i++) {
                        final byte[] valueBytes = new byte[dataInputStream.readInt()];
                        dataInputStream.read(valueBytes);
                        arrayList.add(innerDeserialiser.deserialize(topic, valueBytes));
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Unable to deserialize ArrayList", e);
                }

                return arrayList;
            }
        };
    }
}