在消费者中反序列化加密的kafka消息
Deserialize encrypted kafka message in consumer
我有一个 kafka 主题,其中包含没有 avro 架构的消息。我们最近想使用 avro 模式将消息推送到该主题。
现在主题有两条消息 with/without 架构。
我有一个消费者从这个主题消费。
-> 如果我在消费者配置中将 value.deserializer
设置为“KafkaAvroDeserializer.class
”,我看不到任何消息被消费。
-> 如果我在消费者配置中将 value.deserializer
设置为“StringDeserializer.class
”,我可以使用这些消息,但是,具有 avro 模式的消息现在看起来是加密的。
例如:ConsumerRecord(topic = sample-events, partition = 2, offset = 1089595, CreateTime = 1544116093932, checksum = 2421249481, serialized key size = -1, serialized value size = 159, key = null, value = ���test_impressLbhpb_extranet_opportunity_cleaning_fecron�����YH00756f54-ba55-11e7-8df0-fdb86cefa6ed$abcde)
.
我已经为 avro 模式生成了 java classes,我想将来自消费者的消息 with/without 模式投射到这个生成的 avro java class。我可以使用 objectMapper.
将没有模式的消息映射到 avro java class
但是对于来自消费者的带有 avro 模式的消息,看起来像示例中提到的那样加密,我正在尝试下面的代码片段:
SpecificDatumReader<SampleEvents> reader = new SpecificDatumReader<SampleEvents>(SampleEvents.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(record), null);
SampleEvents event = reader.read(null, decoder);
但这不起作用。我收到“错误:
java.lang.ArrayIndexOutOfBoundsException: -1".
如何反序列化此消息?
If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.
嗯,你至少应该收到 HTTP 或反序列化器错误...
首先,您应该使用 BytesDeserializer
或其变体
然后,你需要熟悉ByteBuffer
的方法,把byte[]
变成一个....
如果您有 Schema Registry 编码的 Avro 消息,那么它们有 well-defined wire format
因此,您可能会得到如下内容,但最后,它需要对主题中可能包含哪些数据进行一些推断。
// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)
ByteBuffer buf = record.value();
Deserializer d;
if (buf == null) {
System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
int schemaId = buf.getInt(); // If you wanted it
d = new KafkaAvroDeserializer();
Map<String, String> config = new HashMap<>();
config.put("schema.registry.url", "http://..."); // address to registry
boolean isKey = false;
d.configure(config, isKey);
AvroValue v = d.deserialize(value);
// TODO: Handle record
} else {
try {
d = new StringDeserializer();
String s = d.deserialize(value);
// TODO: Handle record
} catch (Exception e) {
e.printStackTrace();
}
}
要点:不要在主题中生成 Avro 和非 Avro 数据类型。否则,您只需要按字节消费并自己处理自定义逻辑。
我有一个 kafka 主题,其中包含没有 avro 架构的消息。我们最近想使用 avro 模式将消息推送到该主题。
现在主题有两条消息 with/without 架构。
我有一个消费者从这个主题消费。
-> 如果我在消费者配置中将 value.deserializer
设置为“KafkaAvroDeserializer.class
”,我看不到任何消息被消费。
-> 如果我在消费者配置中将 value.deserializer
设置为“StringDeserializer.class
”,我可以使用这些消息,但是,具有 avro 模式的消息现在看起来是加密的。
例如:ConsumerRecord(topic = sample-events, partition = 2, offset = 1089595, CreateTime = 1544116093932, checksum = 2421249481, serialized key size = -1, serialized value size = 159, key = null, value = ���test_impressLbhpb_extranet_opportunity_cleaning_fecron�����YH00756f54-ba55-11e7-8df0-fdb86cefa6ed$abcde)
.
我已经为 avro 模式生成了 java classes,我想将来自消费者的消息 with/without 模式投射到这个生成的 avro java class。我可以使用 objectMapper.
将没有模式的消息映射到 avro java class但是对于来自消费者的带有 avro 模式的消息,看起来像示例中提到的那样加密,我正在尝试下面的代码片段:
SpecificDatumReader<SampleEvents> reader = new SpecificDatumReader<SampleEvents>(SampleEvents.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(record), null);
SampleEvents event = reader.read(null, decoder);
但这不起作用。我收到“错误:
java.lang.ArrayIndexOutOfBoundsException: -1".
如何反序列化此消息?
If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.
嗯,你至少应该收到 HTTP 或反序列化器错误...
首先,您应该使用 BytesDeserializer
或其变体
然后,你需要熟悉ByteBuffer
的方法,把byte[]
变成一个....
如果您有 Schema Registry 编码的 Avro 消息,那么它们有 well-defined wire format
因此,您可能会得到如下内容,但最后,它需要对主题中可能包含哪些数据进行一些推断。
// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)
ByteBuffer buf = record.value();
Deserializer d;
if (buf == null) {
System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
int schemaId = buf.getInt(); // If you wanted it
d = new KafkaAvroDeserializer();
Map<String, String> config = new HashMap<>();
config.put("schema.registry.url", "http://..."); // address to registry
boolean isKey = false;
d.configure(config, isKey);
AvroValue v = d.deserialize(value);
// TODO: Handle record
} else {
try {
d = new StringDeserializer();
String s = d.deserialize(value);
// TODO: Handle record
} catch (Exception e) {
e.printStackTrace();
}
}
要点:不要在主题中生成 Avro 和非 Avro 数据类型。否则,您只需要按字节消费并自己处理自定义逻辑。