我在哪里可以找到 ArrayListSerde?
Where do I find ArrayListSerde?
我需要一个用于 ArrayList 的 Serde,在网络上搜索 ArrayListSerde 找到了对这种东西的引用,但我在我使用的 Kafka Streams 版本的文档或库中都找不到它使用。我在哪里可以找到它?
KStream 库没有提供 ArrayListSerde 的官方实现。您需要使用 Serializer 和 Deserializer 接口实现自定义 Serde。
下面也提到了post:
我正在尝试这样的事情。看起来它在做明智的事情,虽然我还不确定我用 null
参数做正确的事情 serialize
(并且还不知道为什么它被调用 null
).
public class ArrayListSerde<T> implements Serde<ArrayList<T>> {
private final Serializer <T> innerSerialiser;
private final Deserializer<T> innerDeserialiser;
public ArrayListSerde(Serde<T> inner) {
innerSerialiser = inner.serializer ();
innerDeserialiser = inner.deserializer();
}
@Override
public Serializer<ArrayList<T>> serializer() {
return new Serializer<ArrayList<T>>() {
@Override
public byte[] serialize(String topic, ArrayList<T> data) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (data != null ) {
final int size = data.size();
final DataOutputStream dos = new DataOutputStream(baos);
final Iterator<T> iterator = data.iterator();
try {
dos.writeInt(size);
while (iterator.hasNext()) {
final byte[] bytes = innerSerialiser.serialize(topic, iterator.next());
dos.writeInt(bytes.length);
dos.write(bytes);
}
} catch (IOException e) {
throw new RuntimeException("Unable to serialize ArrayList", e);
}
}
return baos.toByteArray();
}
};
}
@Override
public Deserializer<ArrayList<T>> deserializer() {
return new Deserializer<ArrayList<T>>() {
@Override
public ArrayList<T> deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
final ArrayList<T> arrayList = new ArrayList<>();
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));
try {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
arrayList.add(innerDeserialiser.deserialize(topic, valueBytes));
}
} catch (IOException e) {
throw new RuntimeException("Unable to deserialize ArrayList", e);
}
return arrayList;
}
};
}
}
我需要一个用于 ArrayList 的 Serde,在网络上搜索 ArrayListSerde 找到了对这种东西的引用,但我在我使用的 Kafka Streams 版本的文档或库中都找不到它使用。我在哪里可以找到它?
KStream 库没有提供 ArrayListSerde 的官方实现。您需要使用 Serializer 和 Deserializer 接口实现自定义 Serde。
下面也提到了post:
我正在尝试这样的事情。看起来它在做明智的事情,虽然我还不确定我用 null
参数做正确的事情 serialize
(并且还不知道为什么它被调用 null
).
public class ArrayListSerde<T> implements Serde<ArrayList<T>> {
private final Serializer <T> innerSerialiser;
private final Deserializer<T> innerDeserialiser;
public ArrayListSerde(Serde<T> inner) {
innerSerialiser = inner.serializer ();
innerDeserialiser = inner.deserializer();
}
@Override
public Serializer<ArrayList<T>> serializer() {
return new Serializer<ArrayList<T>>() {
@Override
public byte[] serialize(String topic, ArrayList<T> data) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (data != null ) {
final int size = data.size();
final DataOutputStream dos = new DataOutputStream(baos);
final Iterator<T> iterator = data.iterator();
try {
dos.writeInt(size);
while (iterator.hasNext()) {
final byte[] bytes = innerSerialiser.serialize(topic, iterator.next());
dos.writeInt(bytes.length);
dos.write(bytes);
}
} catch (IOException e) {
throw new RuntimeException("Unable to serialize ArrayList", e);
}
}
return baos.toByteArray();
}
};
}
@Override
public Deserializer<ArrayList<T>> deserializer() {
return new Deserializer<ArrayList<T>>() {
@Override
public ArrayList<T> deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
final ArrayList<T> arrayList = new ArrayList<>();
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));
try {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
arrayList.add(innerDeserialiser.deserialize(topic, valueBytes));
}
} catch (IOException e) {
throw new RuntimeException("Unable to deserialize ArrayList", e);
}
return arrayList;
}
};
}
}