如何在 Spring Cloud Stream App 中反序列化 MessagePack
How to deserialize MessagePack in Spring Cloud Stream App
我正在尝试使用 Spring Cloud Stream 创建一个 Kafka 流应用程序,但我正在努力反序列化输入消息,其值已使用 MessagePack.
进行编码
这是我目前得到的:
// TransactionApplication.java
@SpringBootApplication
public class TransactionApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionApplication.class, args);
}
public static class TransactionConsumer {
@Bean
public Serde<Transaction> transactionSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
return new JsonSerde<Transaction>(mapper);
}
@Bean
public Consumer<KStream<String, Transaction>> process() {
return input -> input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
}
// Transaction.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
String item;
Number amount;
}
我遇到错误:
java.lang.IllegalStateException: No type information in headers and no default type provided.
我的application.yml是:
spring.cloud.stream:
bindings:
process-in-0:
destination: transactions
kafka:
streams:
binder:
applicationId: transactions-application
configuration:
commit.interval.ms: 100
在 applicaton.yml 的 configuration
节点下包含 spring.json.value.default.type: com.example.Transaction
后,我收到另一个错误。见下文。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
您需要向反序列化器提供提示,告诉它 object 从编码的有效负载中创建什么。
如果记录是由 Spring JsonSerializer 创建的,则提示在 headers.
如果没有,您必须在流配置中提供提示。
您需要出示您的application.yml/properties
。
我正在尝试使用 Spring Cloud Stream 创建一个 Kafka 流应用程序,但我正在努力反序列化输入消息,其值已使用 MessagePack.
进行编码这是我目前得到的:
// TransactionApplication.java
@SpringBootApplication
public class TransactionApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionApplication.class, args);
}
public static class TransactionConsumer {
@Bean
public Serde<Transaction> transactionSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
return new JsonSerde<Transaction>(mapper);
}
@Bean
public Consumer<KStream<String, Transaction>> process() {
return input -> input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
}
// Transaction.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
String item;
Number amount;
}
我遇到错误:
java.lang.IllegalStateException: No type information in headers and no default type provided.
我的application.yml是:
spring.cloud.stream:
bindings:
process-in-0:
destination: transactions
kafka:
streams:
binder:
applicationId: transactions-application
configuration:
commit.interval.ms: 100
在 applicaton.yml 的 configuration
节点下包含 spring.json.value.default.type: com.example.Transaction
后,我收到另一个错误。见下文。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
您需要向反序列化器提供提示,告诉它 object 从编码的有效负载中创建什么。
如果记录是由 Spring JsonSerializer 创建的,则提示在 headers.
如果没有,您必须在流配置中提供提示。
您需要出示您的application.yml/properties
。