修改消息body,或者其他方式修改骆驼路线中的一些数据

Modify the message body, or other wise modify some data in a camel route

对 Camel 路由中的某些数据进行小幅修改的最佳方法是什么?

我正在从 Mongo 中提取 BSON 文档。我需要在 http 调用中使用它的时间戳,但我需要将它从毫秒转换为秒。

我尝试设置 header。

.setHeader("test").jsonpath("$.startTime")

这让我可以使用简单表达式将时间戳添加到 URL。

.toD("https://test.com/api/markets?resolution=60&start_time=${headers.test}")

但是我找不到修改 header 的值的方法。

我也试过使用进程

.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        DocumentContext message = JsonPath.parse(exchange.getMessage().getBody());
        String time = message.read("$.startTime").toString();
        time = "111100000";  
        // do something with the payload and/or exchange here
       //exchange.getIn().setBody("Changed body");
   }
})

但是这里的交换没有传回。我基于我如何使用一个丰富的 EIP,以及一个聚合策略来返回一个包含我所做的更改的交换。这个过程似乎不是那样工作的。

您可以使用 Lambda、处理器或 bean 修改 body、header 或 属性。对于处理器,您需要使用 Message.setHeader 方法来修改 header 的值,至少对于值类型和字符串。 Bean 方法默认接收 body 值,因此如果您想传入 header,您需要使用简单的语言指定它。

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class SetHeaderTest extends CamelTestSupport {

    @Test
    public void testGreeting() throws Exception {

        MockEndpoint resultMockEndpoint = getMockEndpoint("mock:result");
        resultMockEndpoint.expectedMessageCount(3);

        template.sendBodyAndHeader("direct:modifyGreetingLambda", 
            null, "greeting", "Hello");

        template.sendBodyAndHeader("direct:modifyGreetingProcessor", 
            null, "greeting", "Hello");

        template.sendBodyAndHeader("direct:modifyGreetingBean", 
            null, "greeting", "Hello");
    
        resultMockEndpoint.assertIsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            @Override
            public void configure() throws Exception {
                
                from("direct:modifyGreetingLambda")
                    .routeId("modifyGreetingLambda")
                    .setHeader("greeting").exchange(exchange -> {   
                        String modifiedGreeting = (String)exchange.getMessage().getHeader("greeting");
                        modifiedGreeting += " world!";
                        return modifiedGreeting;
                    })
                    .log("${headers.greeting}")
                    .to("mock:result");

                from("direct:modifyGreetingProcessor")
                    .routeId("modifyGreetingProcessor")
                    .process(new Processor(){

                        @Override
                        public void process(Exchange exchange) throws Exception {
                            String modifiedGreeting = (String)exchange.getMessage().getHeader("greeting");
                            modifiedGreeting += " world!";
                            exchange.getMessage().setHeader("greeting", modifiedGreeting);
                        }
                    })
                    .log("${headers.greeting}")
                    .to("mock:result");

                from("direct:modifyGreetingBean")
                    .routeId("modifyGreetingBean")
                    .setHeader("greeting").method(new ModifyGreetingBean(), 
                        "modifyGreeting('${headers.greeting}')")
                    .log("${headers.greeting}")
                    .to("mock:result");
            }
        };
    }

    public class ModifyGreetingBean {
        public String modifyGreeting(String greeting) {
            return greeting + " world!";
        }
    }
}

除此之外,您还可以使用简单或 groovy 等表达式语言。

在路由中你可以设置 header 和毫秒值 .setHeader("test").jsonpath("$.startTime")

然后在处理器中您可以检索此值:

String milliSecondsValue = (String) exchange.getIn().getHeader("test");

然后你 transform 将 milliSecondsValue 设置为你想要的值,然后将其设置回交换:

exchange.getIn().setHeader("test", secondsValue);

之后调用 .toD("https://test.com/api/markets?resolution=60&start_time=${header.test}") 它将使用秒值