记录处理器可以是 spring 单例 bean 吗?

Can record processor be spring singleton bean?

我正在使用 spring-kafka 来实现将小写字母转换为大写字母的拓扑,如下所示:

        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A new processor object is created here per record 
        sourceStream.process(() -> new CapitalCaseProcessor());
        ...
    }

处理器不是spring单例bean并且声明如下:

public class CapitalCaseProcessor implements Processor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

上述处理器是有状态的,保存处理器上下文的状态。

现在,如果我们将有状态 CapitalCaseProcessor 转换为 spring 单例 bean 会发生什么?

@Component
public class CapitalCaseProcessor implements Processor<String, String> {
    
    //Is the ProcessorContext going to have thread safety issue now?
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

并尝试将其作为 spring bean:

注入到主拓扑中
@Configuration
public class UppercaseTopologyProcessor {
       @Autowired CapitalCaseProcessor capitalCaseProcessor;
        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> capitalCaseProcessor);
        ...
    }

它现在包含 processorContext 作为状态,是否会导致 CapitalCaseProcessor 出现线程安全问题?

还是像这样将其声明为原型 bean 更好?

@Configuration
public class UppercaseTopologyProcessor {
           
     @Lookup
     public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
        
     @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> getCapitalCaseProcessor());
        ...
    }

更新:我基本上想知道两件事:

  1. 处理器实例是否应该像 AKKA actor 模型那样按流记录关联,其中 actor 是有状态的并且按请求工作,或者它可以是一个单例对象?
  2. ProcessorContext 线程安全吗?

我只是 运行 一个测试,处理器上下文不是线程安全的,使流线程安全的是你使用 ProcessorSupplier (在你的第一个例子中)创建一个每条记录的新处理器实例。

你绝对不能用 Spring 单例替换它。

这是我的测试,使用 Spring 为 Apache Kafka 提供的 MessagingTransformer

@SpringBootApplication
@EnableKafkaStreams
public class So66200448Application {


    private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So66200448Application.class, args);
    }

    @Bean
    KStream<String, String> stream(StreamsBuilder sb) {
        KStream<String, String> stream = sb.stream("so66200448");

        stream.transform(() ->  new MessagingTransformer(msg -> {
            log.info(msg.toString());
            log.info(new String(msg.getHeaders().get("foo", byte[].class)));
            return msg;
        }, new MessagingMessageConverter()) {

            @Override
            public KeyValue transform(Object key, Object value) {
                try {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return super.transform(key, value);
            }

        })
            .to("so66200448out");
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            Headers headers = new RecordHeaders();
            headers.add(new RecordHeader("foo", "bar".getBytes()));
            ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
            template.send(record);
            headers.remove("foo");
            headers.add(new RecordHeader("foo", "baz".getBytes()));
            record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
            template.send(record);
        };
    }

    @KafkaListener(id = "so66200448out", topics = "so66200448out")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.streams.application-id=so66200448
spring.kafka.streams.properties.num.stream.threads=2
spring.kafka.consumer.auto-offset-reset=earliest

2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar

2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

每次将供应商更改为 return 相同的实例,肯定会破坏它。

@Bean
KStream<String, String> stream(StreamsBuilder sb) {
    KStream<String, String> stream = sb.stream("so66200448");
    MessagingTransformer transformer = new MessagingTransformer(msg -> {
        log.info(msg.toString());
        log.info(new String(msg.getHeaders().get("foo", byte[].class)));
        return msg;
    }, new MessagingMessageConverter()) {

        @Override
        public KeyValue transform(Object key, Object value) {
            try {
                Thread.sleep(5000);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return super.transform(key, value);
        }

    };
    stream.transform(() ->  transformer)
        .to("so66200448out");
    return stream;
}

2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz

2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

因此,为了线程安全,流依赖于每次获取一个新实例。