Camel - 使用 Java DSL 通过动态路由发出并行 GET 请求并聚合结果

Camel - Making parallel GET requests and aggregating the results via Dynamic Routes using Java DSL

我正在 Jetty Http 端点上收到请求。请求正文包含请求正文中的一些 url。我必须向这些网址发出 GET 请求。然后将每个 GET 请求和 return 请求的结果聚合到调用方。

请求正文:-

{
    "data" : [
        {"name" : "Hello", "url" : "http://server1"}
        {"name" : "Hello2", "url" : "http://server2"}

    ]
}

我能想到的一种方法如下所示:-

from("jetty:http://localhost:8888/hello").process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        // 1. Make the GET request in parallel using ThreadPoolExecutor
        // 2. Wait for all calls to finish. Collate the response
        // 3. Write it to exchange.getOut().setBody
    }
})

有人可以告诉我这是否可以通过 Java DSL 使用骆驼动态路由、拆分器和聚合器来实现,以便 Processor 保持相对较小?

我正在使用 camel 2.16.3。

步骤为:

  1. 将传入数据拆分为 URL 秒。这可能涉及子步骤:对于 例如,您可以将传入的 JSON 字符串解组到某个 POJO 中,也许这个 POJO 有一个数组或列表,其中每个条目都是一个 URL。然后,您可以将其作为正文传递。 (当然,您可能需要来自传入请求的其他信息,因此您可以改变它。)
  2. 如果正文是数组或其他它可以轻松处理的东西,拆分器将很容易拆分。在最简单的情况下,拆分器将在每个拆分消息的正文中传递一个 URI。
  3. 下一步——在拆分器的流程中——你可以有一个像 http4 这样的生产者,但你可以要求它使用消息中的 URI,而不是在端点上使用 URI。
  4. 最后,您将拥有一个聚合器。

听起来你问题的核心是关于动态 URI。代码片段可能是这样的:

 from(...)... etc. 
    .setHeader(Exchange.HTTP_URI, simple("${body}"))
    .setHeader(Exchange.HTTP_METHOD,  
              constant(org.apache.camel.component.http4.HttpMethods.GET))
    .to("http4://google.com")

对于小型工作演示,see this class

public class HttpDynamicClient extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("direct:testMultiple")
        .split(body())
        .to("direct:httpClient");

        from("direct:httpClient")
        .log("starting httpClient route")
        .setHeader(Exchange.HTTP_URI, simple("${body}"))
        .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
        .to("http4://google.com")
        .process(new BodyToStringConverter())
        .log(LoggingLevel.INFO, "Output was ${body}");
    }

    private static class BodyToStringConverter implements Processor {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(exchange.getIn().getBody(String.class));
        }
    }


    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        try {
            Logger logger = Logger.getLogger(HttpDynamicClient.class);

            context.addRoutes(new HttpDynamicClient());
            ProducerTemplate template = context.createProducerTemplate();
            context.start();
            Thread.sleep(1000);


            template.sendBody("direct:httpClient", "http://jsonplaceholder.typicode.com/posts/1");
            Thread.sleep(2000);

            template.sendBody("direct:testMultiple", new String [] {"http://jsonplaceholder.typicode.com/posts/1" , "http://jsonplaceholder.typicode.com/posts/1"});


        } finally {
            context.stop();
        }
    }

}

@Darius X. 的答案正是您所需要的。要使后端请求并行执行并将响应主体聚合到字符串列表中,您需要设置聚合策略并在拆分定义上设置并行处理标志。

@Override
public void configure() throws Exception {

    from("direct:testMultiple")
            .split(body(), new FlexibleAggregationStrategy<String>()
                    .pick(Builder.body())
                    .castAs(String.class)
                    .accumulateInCollection(ArrayList.class))
            .parallelProcessing()
            // .executorService(<instance of java.util.concurrent.ExecutorService>) // optional: use custom thread pool for parallel processing
            .to("direct:httpClient");

    from("direct:httpClient")
            .log("starting httpClient route")
            .setHeader(Exchange.HTTP_URI, simple("${body}"))
            .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
            .to("http4://google.com")
            .convertBodyTo(String.class)
            .log(LoggingLevel.INFO, "Output was ${body}");
}

direct:testMultiple返回的交换输出消息将包含您的结果数组作为正文。