记录处理器可以是 spring 单例 bean 吗?
Can record processor be spring singleton bean?
我正在使用 spring-kafka 来实现将小写字母转换为大写字母的拓扑,如下所示:
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A new processor object is created here per record
sourceStream.process(() -> new CapitalCaseProcessor());
...
}
处理器不是spring单例bean并且声明如下:
public class CapitalCaseProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
上述处理器是有状态的,保存处理器上下文的状态。
现在,如果我们将有状态 CapitalCaseProcessor
转换为 spring 单例 bean 会发生什么?
@Component
public class CapitalCaseProcessor implements Processor<String, String> {
//Is the ProcessorContext going to have thread safety issue now?
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
并尝试将其作为 spring bean:
注入到主拓扑中
@Configuration
public class UppercaseTopologyProcessor {
@Autowired CapitalCaseProcessor capitalCaseProcessor;
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> capitalCaseProcessor);
...
}
它现在包含 processorContext
作为状态,是否会导致 CapitalCaseProcessor 出现线程安全问题?
还是像这样将其声明为原型 bean 更好?
@Configuration
public class UppercaseTopologyProcessor {
@Lookup
public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> getCapitalCaseProcessor());
...
}
更新:我基本上想知道两件事:
- 处理器实例是否应该像 AKKA actor 模型那样按流记录关联,其中 actor 是有状态的并且按请求工作,或者它可以是一个单例对象?
- ProcessorContext 线程安全吗?
我只是 运行 一个测试,处理器上下文不是线程安全的,使流线程安全的是你使用 ProcessorSupplier
(在你的第一个例子中)创建一个每条记录的新处理器实例。
你绝对不能用 Spring 单例替换它。
这是我的测试,使用 Spring 为 Apache Kafka 提供的 MessagingTransformer
:
@SpringBootApplication
@EnableKafkaStreams
public class So66200448Application {
private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);
public static void main(String[] args) {
SpringApplication.run(So66200448Application.class, args);
}
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
stream.transform(() -> new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
})
.to("so66200448out");
return stream;
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("foo", "bar".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
template.send(record);
headers.remove("foo");
headers.add(new RecordHeader("foo", "baz".getBytes()));
record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
template.send(record);
};
}
@KafkaListener(id = "so66200448out", topics = "so66200448out")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.streams.application-id=so66200448
spring.kafka.streams.properties.num.stream.threads=2
spring.kafka.consumer.auto-offset-reset=earliest
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
每次将供应商更改为 return 相同的实例,肯定会破坏它。
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
MessagingTransformer transformer = new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
};
stream.transform(() -> transformer)
.to("so66200448out");
return stream;
}
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
因此,为了线程安全,流依赖于每次获取一个新实例。
我正在使用 spring-kafka 来实现将小写字母转换为大写字母的拓扑,如下所示:
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A new processor object is created here per record
sourceStream.process(() -> new CapitalCaseProcessor());
...
}
处理器不是spring单例bean并且声明如下:
public class CapitalCaseProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
上述处理器是有状态的,保存处理器上下文的状态。
现在,如果我们将有状态 CapitalCaseProcessor
转换为 spring 单例 bean 会发生什么?
@Component
public class CapitalCaseProcessor implements Processor<String, String> {
//Is the ProcessorContext going to have thread safety issue now?
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
并尝试将其作为 spring bean:
注入到主拓扑中@Configuration
public class UppercaseTopologyProcessor {
@Autowired CapitalCaseProcessor capitalCaseProcessor;
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> capitalCaseProcessor);
...
}
它现在包含 processorContext
作为状态,是否会导致 CapitalCaseProcessor 出现线程安全问题?
还是像这样将其声明为原型 bean 更好?
@Configuration
public class UppercaseTopologyProcessor {
@Lookup
public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> getCapitalCaseProcessor());
...
}
更新:我基本上想知道两件事:
- 处理器实例是否应该像 AKKA actor 模型那样按流记录关联,其中 actor 是有状态的并且按请求工作,或者它可以是一个单例对象?
- ProcessorContext 线程安全吗?
我只是 运行 一个测试,处理器上下文不是线程安全的,使流线程安全的是你使用 ProcessorSupplier
(在你的第一个例子中)创建一个每条记录的新处理器实例。
你绝对不能用 Spring 单例替换它。
这是我的测试,使用 Spring 为 Apache Kafka 提供的 MessagingTransformer
:
@SpringBootApplication
@EnableKafkaStreams
public class So66200448Application {
private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);
public static void main(String[] args) {
SpringApplication.run(So66200448Application.class, args);
}
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
stream.transform(() -> new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
})
.to("so66200448out");
return stream;
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("foo", "bar".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
template.send(record);
headers.remove("foo");
headers.add(new RecordHeader("foo", "baz".getBytes()));
record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
template.send(record);
};
}
@KafkaListener(id = "so66200448out", topics = "so66200448out")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.streams.application-id=so66200448
spring.kafka.streams.properties.num.stream.threads=2
spring.kafka.consumer.auto-offset-reset=earliest
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
每次将供应商更改为 return 相同的实例,肯定会破坏它。
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
MessagingTransformer transformer = new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
};
stream.transform(() -> transformer)
.to("so66200448out");
return stream;
}
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
因此,为了线程安全,流依赖于每次获取一个新实例。