如何在 Spring Cloud Stream App 中反序列化 MessagePack

How to deserialize MessagePack in Spring Cloud Stream App

我正在尝试使用 Spring Cloud Stream 创建一个 Kafka 流应用程序,但我正在努力反序列化输入消息,其值已使用 MessagePack.

进行编码

这是我目前得到的:

// TransactionApplication.java

@SpringBootApplication
public class TransactionApplication {

  public static void main(String[] args) {
    SpringApplication.run(TransactionApplication.class, args);
  }

  public static class TransactionConsumer {

    @Bean
    public Serde<Transaction> transactionSerde() {
      ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
      return new JsonSerde<Transaction>(mapper);
    }

    @Bean
    public Consumer<KStream<String, Transaction>> process() {
      return input -> input.foreach((key, value) -> {
        System.out.println("Key: " + key + " Value: " + value);
      });
    }
  }
}
// Transaction.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
  String item;
  Number amount;
}

我遇到错误:

java.lang.IllegalStateException: No type information in headers and no default type provided.

我的application.yml是:

spring.cloud.stream:
  bindings:
    process-in-0:
      destination: transactions
  kafka:
    streams:
      binder:
        applicationId: transactions-application
        configuration:
          commit.interval.ms: 100

在 applicaton.yml 的 configuration 节点下包含 spring.json.value.default.type: com.example.Transaction 后,我收到另一个错误。见下文。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
 at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]

您需要向反序列化器提供提示,告诉它 object 从编码的有效负载中创建什么。

如果记录是由 Spring JsonSerializer 创建的,则提示在 headers.

如果没有,您必须在流配置中提供提示。

您需要出示您的application.yml/properties