KStream-KStream 内连接抛出 java.lang.ClassCastException
KStream-KStream inner join throws java.lang.ClassCastException
在 @StreamListener
的处理方法中,我将学校 KStream 映射到个人 KStream,并通过 .through() 方法填充主题 "person",我从中生成 KStream
在另一个 process1 方法中
@StreamListener
。
MianApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsTableJoin.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication {
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools) {
schools.map((schoolKey, school) -> {
return KeyValue.pair(new PersonKey("Adam", "Smith", schoolKey.getId()), new Person(12));
})
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
}
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) {
schools.selectKey((schoolKey, school) -> schoolKey.getId())
.join(persons.selectKey((personKey, person) -> personKey.getId()),
(school, person) -> {
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
},
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
}
}
interface KStreamProcessorX {
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
}
}
在方法 process1 中,此 KStream 需要与另一个 KStream 连接,但出现以下异常:
Exception in thread "stream-join-sample_2-654e8060-5b29-4694-9188-032a9779529c-StreamThread-1" java.lang.ClassCastException: class kafka.streams.join.School cannot be cast to class kafka.streams.join.Person (kafka.streams.join.School and kafka.streams.join.Person are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$reverseJoiner[=12=](AbstractStream.java:98)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
我认为此异常与不正确的 serde 有关,但我无法弄清楚是哪个 serde 造成了问题以及如何解决它。或者是在方法过程中的映射过程中,
重新分区被触发,这与不正确的 serde 有关系吗?
POJO 和 Serde 的:
Person.java
public class Person {
private double age;
public Person() {
}
public Person(double age) {
this.age = age;
}
@JsonGetter("age")
public double getAge() {
return age;
}
@JsonSetter("age")
public void setAge(double age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"age=" + age +
'}';
}
}
PersonSerde.java
public class PersonSerde extends Serdes.WrapperSerde<Person> {
public PersonSerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(Person.class));
}
}
PersonKey.java
public class PersonKey {
private String firstName;
private String lastName;
private int id;
public PersonKey() {
}
public PersonKey(String firstName, String lastName, int id) {
this.firstName = firstName;
this.lastName = lastName;
this.id = id;
}
@JsonGetter("firstName")
public String getFirstName() {
return firstName;
}
@JsonSetter("firstName")
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@JsonGetter("lastName")
public String getLastName() {
return lastName;
}
@JsonSetter("lastName")
public void setLastName(String lastName) {
this.lastName = lastName;
}
@JsonGetter("id")
public int getId() {
return id;
}
@JsonSetter("id")
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonKey{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersonKey personKey = (PersonKey) o;
return id == personKey.id &&
Objects.equals(firstName, personKey.firstName) &&
Objects.equals(lastName, personKey.lastName);
}
@Override
public int hashCode() {
return Objects.hash(firstName, lastName, id);
}
}
PersonKeySerde.java
public class PersonKeySerde extends Serdes.WrapperSerde<PersonKey> {
public PersonKeySerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(PersonKey.class));
}
}
学校 class 的 serde 和 pojo 与人 class 相似。
application.yml
spring.application.name: stream-join-sample
spring.cloud.stream.bindings.school:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_1
spring.cloud.stream.bindings.person:
destination: person
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_2
spring.cloud.stream.bindings.school_1:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
Sample Application 具有可重现的步骤:
会不会是主题中某处或底层更新日志主题中有一些陈旧数据?您可以尝试使用新主题和不同的 application-id 看看它是否能解决您的问题?
这是一个要使用的示例配置:
spring.cloud.stream.bindings.school:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_1
spring.cloud.stream.bindings.person:
destination: person-abc
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.bindings.school_1:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
请注意,我更改了主题名称、应用程序 ID 等。您可能想要更新填充主题的任何生产者。
此外,请注意您不需要指定内容类型,将 useNativeDecoding
设置为 false
等,因为这些是当前版本的 kafka 流绑定程序中的默认设置。
我从 GitHub 下载了你的代码来深入研究,结果发现它实际上是使用过的 JsonSerializer
/JsonDeserializer
中的一个错误。类型 (School
, Person
, PersonKey
, SchoolKey
) 在记录 header 中编码,但 header 永远不会被清除向上。每次类型更改时,仅附加一个新的 header(header 键不唯一,允许重复)。
对于某些记录,同一类型只是被编码多次,因此,这部分代码有效。但是,在某些情况下,当从主题读取数据时,会编码不同的类型并选择一种类型(找到的第一个 header)"randomly"。这发生在连接之前,但是在从重新分区主题接收数据时。如果选择了错误的类型,代码会在 ClassCastException
之后崩溃。
新答案:
根据关于此问题单的讨论,https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/685,您应该禁止通过以下方式将类型信息写入记录 headers:
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
请注意,所有手动创建的 Serdes
,即通过调用 new
必须手动配置:
Map<String, Object> config = new HashMap<>();
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
PersonKeySerde personKeySerde = new PersonKeySerde();
personKeySerde.configure(config, true);
PersonSerde personSerde = new PersonSerde();
personSerde.configure(config, false);
// ...
.through("person", Produced.with(personKeySerde, personSerde));
原答案:
作为解决方法,您可以将 map
和 selectKey()
替换为 transform()
并清除 transform()
中的 header。虽然这是一个黑客。您应该再次向 SpringBoot 项目提交工单,以便他们可以修复 JsonSerializer
/JsonDeserializer
.
以下代码删除 headers 并确保使用正确的类型,避免 ClassCastException
:
@SpringBootApplication
public class KafkaStreamJoinApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamJoinApplication.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KafkaKStreamJoinApplication {
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools) {
// replace map() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<PersonKey, Person>>() {
@Override
public Transformer<SchoolKey, School, KeyValue<PersonKey, Person>> get() {
return new Transformer<SchoolKey, School, KeyValue<PersonKey, Person>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<PersonKey, Person> transform(final SchoolKey key, final School value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// same a "old" map code:
return KeyValue.pair(new PersonKey("Adam", "Smith", key.getId()), new Person(12));
}
@Override
public void close() {}
};
}})
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
}
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) {
// replace selectKey() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<Integer, School>>() {
@Override
public Transformer<SchoolKey, School, KeyValue<Integer, School>> get() {
return new Transformer<SchoolKey, School, KeyValue<Integer, School>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, School> transform(final SchoolKey key, final School value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// effectively the same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
}
@Override
public void close() {}
};
}})
// replace selectKey() with transform()
.join(persons.transform(new TransformerSupplier<PersonKey, Person, KeyValue<Integer, Person>>() {
@Override
public Transformer<PersonKey, Person, KeyValue<Integer, Person>> get() {
return new Transformer<PersonKey, Person, KeyValue<Integer, Person>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, Person> transform(final PersonKey key, final Person value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// effectively same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
}
@Override
public void close() {}
};
}}),
(school, person) -> {
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
},
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
}
}
interface KStreamProcessorX {
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
}
}
在 @StreamListener
的处理方法中,我将学校 KStream 映射到个人 KStream,并通过 .through() 方法填充主题 "person",我从中生成 KStream
在另一个 process1 方法中
@StreamListener
。
MianApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsTableJoin.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication {
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools) {
schools.map((schoolKey, school) -> {
return KeyValue.pair(new PersonKey("Adam", "Smith", schoolKey.getId()), new Person(12));
})
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
}
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) {
schools.selectKey((schoolKey, school) -> schoolKey.getId())
.join(persons.selectKey((personKey, person) -> personKey.getId()),
(school, person) -> {
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
},
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
}
}
interface KStreamProcessorX {
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
}
}
在方法 process1 中,此 KStream 需要与另一个 KStream 连接,但出现以下异常:
Exception in thread "stream-join-sample_2-654e8060-5b29-4694-9188-032a9779529c-StreamThread-1" java.lang.ClassCastException: class kafka.streams.join.School cannot be cast to class kafka.streams.join.Person (kafka.streams.join.School and kafka.streams.join.Person are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$reverseJoiner[=12=](AbstractStream.java:98)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
我认为此异常与不正确的 serde 有关,但我无法弄清楚是哪个 serde 造成了问题以及如何解决它。或者是在方法过程中的映射过程中, 重新分区被触发,这与不正确的 serde 有关系吗?
POJO 和 Serde 的:
Person.java
public class Person {
private double age;
public Person() {
}
public Person(double age) {
this.age = age;
}
@JsonGetter("age")
public double getAge() {
return age;
}
@JsonSetter("age")
public void setAge(double age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"age=" + age +
'}';
}
}
PersonSerde.java
public class PersonSerde extends Serdes.WrapperSerde<Person> {
public PersonSerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(Person.class));
}
}
PersonKey.java
public class PersonKey {
private String firstName;
private String lastName;
private int id;
public PersonKey() {
}
public PersonKey(String firstName, String lastName, int id) {
this.firstName = firstName;
this.lastName = lastName;
this.id = id;
}
@JsonGetter("firstName")
public String getFirstName() {
return firstName;
}
@JsonSetter("firstName")
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@JsonGetter("lastName")
public String getLastName() {
return lastName;
}
@JsonSetter("lastName")
public void setLastName(String lastName) {
this.lastName = lastName;
}
@JsonGetter("id")
public int getId() {
return id;
}
@JsonSetter("id")
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PersonKey{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", id=" + id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersonKey personKey = (PersonKey) o;
return id == personKey.id &&
Objects.equals(firstName, personKey.firstName) &&
Objects.equals(lastName, personKey.lastName);
}
@Override
public int hashCode() {
return Objects.hash(firstName, lastName, id);
}
}
PersonKeySerde.java
public class PersonKeySerde extends Serdes.WrapperSerde<PersonKey> {
public PersonKeySerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(PersonKey.class));
}
}
学校 class 的 serde 和 pojo 与人 class 相似。
application.yml
spring.application.name: stream-join-sample
spring.cloud.stream.bindings.school:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_1
spring.cloud.stream.bindings.person:
destination: person
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_2
spring.cloud.stream.bindings.school_1:
destination: school
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
Sample Application 具有可重现的步骤:
会不会是主题中某处或底层更新日志主题中有一些陈旧数据?您可以尝试使用新主题和不同的 application-id 看看它是否能解决您的问题?
这是一个要使用的示例配置:
spring.cloud.stream.bindings.school:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_1
spring.cloud.stream.bindings.person:
destination: person-abc
spring.cloud.stream.kafka.streams.bindings.person:
consumer:
keySerde: kafka.streams.serde.PersonKeySerde
valueSerde: kafka.streams.serde.PersonSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.bindings.school_1:
destination: school-abc
spring.cloud.stream.kafka.streams.bindings.school_1:
consumer:
keySerde: kafka.streams.serde.SchoolKeySerde
valueSerde: kafka.streams.serde.SchoolSerde
application-id: stream-join-sample_diff_id_2
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 100
请注意,我更改了主题名称、应用程序 ID 等。您可能想要更新填充主题的任何生产者。
此外,请注意您不需要指定内容类型,将 useNativeDecoding
设置为 false
等,因为这些是当前版本的 kafka 流绑定程序中的默认设置。
我从 GitHub 下载了你的代码来深入研究,结果发现它实际上是使用过的 JsonSerializer
/JsonDeserializer
中的一个错误。类型 (School
, Person
, PersonKey
, SchoolKey
) 在记录 header 中编码,但 header 永远不会被清除向上。每次类型更改时,仅附加一个新的 header(header 键不唯一,允许重复)。
对于某些记录,同一类型只是被编码多次,因此,这部分代码有效。但是,在某些情况下,当从主题读取数据时,会编码不同的类型并选择一种类型(找到的第一个 header)"randomly"。这发生在连接之前,但是在从重新分区主题接收数据时。如果选择了错误的类型,代码会在 ClassCastException
之后崩溃。
新答案:
根据关于此问题单的讨论,https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/685,您应该禁止通过以下方式将类型信息写入记录 headers:
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
请注意,所有手动创建的 Serdes
,即通过调用 new
必须手动配置:
Map<String, Object> config = new HashMap<>();
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
PersonKeySerde personKeySerde = new PersonKeySerde();
personKeySerde.configure(config, true);
PersonSerde personSerde = new PersonSerde();
personSerde.configure(config, false);
// ...
.through("person", Produced.with(personKeySerde, personSerde));
原答案:
作为解决方法,您可以将 map
和 selectKey()
替换为 transform()
并清除 transform()
中的 header。虽然这是一个黑客。您应该再次向 SpringBoot 项目提交工单,以便他们可以修复 JsonSerializer
/JsonDeserializer
.
以下代码删除 headers 并确保使用正确的类型,避免 ClassCastException
:
@SpringBootApplication
public class KafkaStreamJoinApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamJoinApplication.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KafkaKStreamJoinApplication {
@StreamListener
public void process(@Input("school") KStream<SchoolKey, School> schools) {
// replace map() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<PersonKey, Person>>() {
@Override
public Transformer<SchoolKey, School, KeyValue<PersonKey, Person>> get() {
return new Transformer<SchoolKey, School, KeyValue<PersonKey, Person>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<PersonKey, Person> transform(final SchoolKey key, final School value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// same a "old" map code:
return KeyValue.pair(new PersonKey("Adam", "Smith", key.getId()), new Person(12));
}
@Override
public void close() {}
};
}})
.through("person", Produced.with(new PersonKeySerde(), new PersonSerde()));
}
@StreamListener
public void process1(@Input("school_1") KStream<SchoolKey, School> schools, @Input("person") KStream<PersonKey, Person> persons) {
// replace selectKey() with transform()
schools.transform(new TransformerSupplier<SchoolKey, School, KeyValue<Integer, School>>() {
@Override
public Transformer<SchoolKey, School, KeyValue<Integer, School>> get() {
return new Transformer<SchoolKey, School, KeyValue<Integer, School>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, School> transform(final SchoolKey key, final School value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// effectively the same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
}
@Override
public void close() {}
};
}})
// replace selectKey() with transform()
.join(persons.transform(new TransformerSupplier<PersonKey, Person, KeyValue<Integer, Person>>() {
@Override
public Transformer<PersonKey, Person, KeyValue<Integer, Person>> get() {
return new Transformer<PersonKey, Person, KeyValue<Integer, Person>>() {
ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, Person> transform(final PersonKey key, final Person value) {
// clear all headers; would be sufficient to only remove type header
for (Header h : context.headers().toArray()) {
context.headers().remove(h.key());
}
// effectively same as "old" selectKey code:
return KeyValue.pair(key.getId(), value);
}
@Override
public void close() {}
};
}}),
(school, person) -> {
System.out.println("school_app2= " + school + ", person_app2= " + person);
return null;
},
JoinWindows.of(Duration.ofSeconds(1)),
Joined.with(Serdes.Integer(), new SchoolSerde(), new PersonSerde())
);
}
}
interface KStreamProcessorX {
@Input("person")
KStream<?, ?> inputPersonKStream();
@Input("school")
KStream<?, ?> inputSchoolKStream();
@Input("school_1")
KStream<?, ?> inputSchool1KStream();
}
}