使用 Apache camel 将 MyObject 传递给 bean 路由的可能性?

Possibility of passing MyObject to bean routing using Apache camel?

我正在使用 apache camel JAVA DSL 消费来自 Apache kafka 的消息。 我正在通过在 kafka 上将它转换为 byte[] 来编写一个对象。当我使用它时,我收到一条返回 byte[] 的消息。我反序列化它并得到一个对象。

我检查它是否是 MyObject 的对象然后需要使用 java DSL .to() 将它传递给 bean。我的代码如下:

public class KafkaRouter extends RouteBuilder {
    
    private MessageBean msgBean;
    
    @Override
    public void configure() throws Exception {
        
        from("{{kafka.cons.uri}}").process(new Processor() {
            
            
            
            @Override
            public void process(Exchange exchange) throws Exception {
                Object obj = SerializationUtils.deserialize(exchange.getIn().getBody(byte[].class));                //TODO cast to specific class as returned after deserialization.
                
                if(obj !=null && obj instanceof MessageBean){
                    
                    msgBean = (MessageBean)obj;
                    
                }
                else {
                    
                    throw new PTFException("Invalid Message read in Kafka Consumer");
                }
                
            }
            
            
        }).bean(PTFTransformerService.class,"callTransformerService(msgBean)"); ;
    }

现在的问题是我只想在相应的调用方法中使用 MyObject 而我不想使用 TypeConvertors。我不想在方法中获取 Exchange/body 我将在进程中处理我的流并在读取无效消息时抛出异常并且不将其转发给 bean。

我在另一端的方法是:

private void callTransformerService(MessageBean msgObj){
    // Got my object here ;-)   
        
    }

在函数参数前添加@Body MessageBean msgObj:

import org.apache.camel.Body;

private void callTransformerService(@Body MessageBean msgObj){

}

您可能需要编写一个自定义回退类型转换器,可以将 kafka byte[] 转换为您的 POJO

然后你可以在bean中定义pojo类型,Camel会使用回退类型转换器尝试转换成pojo类型。

您可以在处理器中设置 Exchange Body,如下所示:

msgBean = (MessageBean)obj;
exchange.getIn().setBody(msgBean, MessageBean.class);
void callTransformerService(@Body MessageBean msgObj) {}