Camel - 使用 Java DSL 通过动态路由发出并行 GET 请求并聚合结果
Camel - Making parallel GET requests and aggregating the results via Dynamic Routes using Java DSL
我正在 Jetty Http 端点上收到请求。请求正文包含请求正文中的一些 url。我必须向这些网址发出 GET
请求。然后将每个 GET
请求和 return 请求的结果聚合到调用方。
请求正文:-
{
"data" : [
{"name" : "Hello", "url" : "http://server1"}
{"name" : "Hello2", "url" : "http://server2"}
]
}
我能想到的一种方法如下所示:-
from("jetty:http://localhost:8888/hello").process(new Processor() {
public void process(Exchange exchange) throws Exception {
// 1. Make the GET request in parallel using ThreadPoolExecutor
// 2. Wait for all calls to finish. Collate the response
// 3. Write it to exchange.getOut().setBody
}
})
有人可以告诉我这是否可以通过 Java DSL 使用骆驼动态路由、拆分器和聚合器来实现,以便 Processor
保持相对较小?
我正在使用 camel 2.16.3。
步骤为:
- 将传入数据拆分为 URL 秒。这可能涉及子步骤:对于
例如,您可以将传入的 JSON 字符串解组到某个 POJO 中,也许这个 POJO 有一个数组或列表,其中每个条目都是一个 URL。然后,您可以将其作为正文传递。 (当然,您可能需要来自传入请求的其他信息,因此您可以改变它。)
- 如果正文是数组或其他它可以轻松处理的东西,拆分器将很容易拆分。在最简单的情况下,拆分器将在每个拆分消息的正文中传递一个 URI。
- 下一步——在拆分器的流程中——你可以有一个像 http4 这样的生产者,但你可以要求它使用消息中的 URI,而不是在端点上使用 URI。
- 最后,您将拥有一个聚合器。
听起来你问题的核心是关于动态 URI。代码片段可能是这样的:
from(...)... etc.
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD,
constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
对于小型工作演示,see this class。
public class HttpDynamicClient extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:testMultiple")
.split(body())
.to("direct:httpClient");
from("direct:httpClient")
.log("starting httpClient route")
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
.process(new BodyToStringConverter())
.log(LoggingLevel.INFO, "Output was ${body}");
}
private static class BodyToStringConverter implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(exchange.getIn().getBody(String.class));
}
}
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
try {
Logger logger = Logger.getLogger(HttpDynamicClient.class);
context.addRoutes(new HttpDynamicClient());
ProducerTemplate template = context.createProducerTemplate();
context.start();
Thread.sleep(1000);
template.sendBody("direct:httpClient", "http://jsonplaceholder.typicode.com/posts/1");
Thread.sleep(2000);
template.sendBody("direct:testMultiple", new String [] {"http://jsonplaceholder.typicode.com/posts/1" , "http://jsonplaceholder.typicode.com/posts/1"});
} finally {
context.stop();
}
}
}
@Darius X. 的答案正是您所需要的。要使后端请求并行执行并将响应主体聚合到字符串列表中,您需要设置聚合策略并在拆分定义上设置并行处理标志。
@Override
public void configure() throws Exception {
from("direct:testMultiple")
.split(body(), new FlexibleAggregationStrategy<String>()
.pick(Builder.body())
.castAs(String.class)
.accumulateInCollection(ArrayList.class))
.parallelProcessing()
// .executorService(<instance of java.util.concurrent.ExecutorService>) // optional: use custom thread pool for parallel processing
.to("direct:httpClient");
from("direct:httpClient")
.log("starting httpClient route")
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
.convertBodyTo(String.class)
.log(LoggingLevel.INFO, "Output was ${body}");
}
direct:testMultiple
返回的交换输出消息将包含您的结果数组作为正文。
我正在 Jetty Http 端点上收到请求。请求正文包含请求正文中的一些 url。我必须向这些网址发出 GET
请求。然后将每个 GET
请求和 return 请求的结果聚合到调用方。
请求正文:-
{
"data" : [
{"name" : "Hello", "url" : "http://server1"}
{"name" : "Hello2", "url" : "http://server2"}
]
}
我能想到的一种方法如下所示:-
from("jetty:http://localhost:8888/hello").process(new Processor() {
public void process(Exchange exchange) throws Exception {
// 1. Make the GET request in parallel using ThreadPoolExecutor
// 2. Wait for all calls to finish. Collate the response
// 3. Write it to exchange.getOut().setBody
}
})
有人可以告诉我这是否可以通过 Java DSL 使用骆驼动态路由、拆分器和聚合器来实现,以便 Processor
保持相对较小?
我正在使用 camel 2.16.3。
步骤为:
- 将传入数据拆分为 URL 秒。这可能涉及子步骤:对于 例如,您可以将传入的 JSON 字符串解组到某个 POJO 中,也许这个 POJO 有一个数组或列表,其中每个条目都是一个 URL。然后,您可以将其作为正文传递。 (当然,您可能需要来自传入请求的其他信息,因此您可以改变它。)
- 如果正文是数组或其他它可以轻松处理的东西,拆分器将很容易拆分。在最简单的情况下,拆分器将在每个拆分消息的正文中传递一个 URI。
- 下一步——在拆分器的流程中——你可以有一个像 http4 这样的生产者,但你可以要求它使用消息中的 URI,而不是在端点上使用 URI。
- 最后,您将拥有一个聚合器。
听起来你问题的核心是关于动态 URI。代码片段可能是这样的:
from(...)... etc.
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD,
constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
对于小型工作演示,see this class。
public class HttpDynamicClient extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:testMultiple")
.split(body())
.to("direct:httpClient");
from("direct:httpClient")
.log("starting httpClient route")
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
.process(new BodyToStringConverter())
.log(LoggingLevel.INFO, "Output was ${body}");
}
private static class BodyToStringConverter implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(exchange.getIn().getBody(String.class));
}
}
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
try {
Logger logger = Logger.getLogger(HttpDynamicClient.class);
context.addRoutes(new HttpDynamicClient());
ProducerTemplate template = context.createProducerTemplate();
context.start();
Thread.sleep(1000);
template.sendBody("direct:httpClient", "http://jsonplaceholder.typicode.com/posts/1");
Thread.sleep(2000);
template.sendBody("direct:testMultiple", new String [] {"http://jsonplaceholder.typicode.com/posts/1" , "http://jsonplaceholder.typicode.com/posts/1"});
} finally {
context.stop();
}
}
}
@Darius X. 的答案正是您所需要的。要使后端请求并行执行并将响应主体聚合到字符串列表中,您需要设置聚合策略并在拆分定义上设置并行处理标志。
@Override
public void configure() throws Exception {
from("direct:testMultiple")
.split(body(), new FlexibleAggregationStrategy<String>()
.pick(Builder.body())
.castAs(String.class)
.accumulateInCollection(ArrayList.class))
.parallelProcessing()
// .executorService(<instance of java.util.concurrent.ExecutorService>) // optional: use custom thread pool for parallel processing
.to("direct:httpClient");
from("direct:httpClient")
.log("starting httpClient route")
.setHeader(Exchange.HTTP_URI, simple("${body}"))
.setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
.to("http4://google.com")
.convertBodyTo(String.class)
.log(LoggingLevel.INFO, "Output was ${body}");
}
direct:testMultiple
返回的交换输出消息将包含您的结果数组作为正文。