骆驼:AWS-S3:文件主体 null ,当使用来自 s3 的文件并传递给 Rest api
Camel: AWS-S3: File body null , when consuming file from s3 and passing to Rest api
当执行 Camel AWS S3 消费者从 S3 读取文件并将它们传递到 Rest End 点失败时,因为文件主体变为空。我使用类似的代码从 sftp 使用文件,它有效。但是当我使用 Aws 端点时,它失败了。 exchange s3对象还有其他设置吗?
当使用来自 s3 的文件并将其传递给 Rest 时,文件主体变为空 API。
代码:
from("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&exchangePattern=InOut")
.convertBodyTo(byte[].class)
.log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
.log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
.filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
// .to("file:target/s3out?fileName=${in.header.CamelAwsS3Key}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
File file = exchange.getIn().getBody(File.class);
multipartEntityBuilder.addPart("file",
new FileBody(file, ContentType.MULTIPART_FORM_DATA, filename));
exchange.getOut().setBody(multipartEntityBuilder.build());
}
})
.to(httpRoute)
错误:
2020-08-02 21:34:57,413 [ws-s3://ds_test] INFO consuming - Consumer Fired!
2020-08-02 21:34:57,414 [ws-s3://ds_test] INFO route1 - Replay Message Sent to file:s3out input_0.csv.gz
2020-08-02 21:34:57,415 [ws-s3://ds_test] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID-XXXXXX-Mac-49195-1596384290692-0-5 on ExchangeId: ID-XXXXXXX-Mac-49195-1596384290692-0-6). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: File may not be null
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [aws-s3://test?amazonS3Client=%23amazonS3Client&autocloseBody=false&delay=50] [ 218]
[route1 ] [convertBodyTo1 ] [convertBodyTo[byte[]] ] [ 216]
[route1 ] [log1 ] [log ] [ 0]
[route1 ] [log2 ] [log ] [ 1]
[route1 ] [filter1 ] [filter[simple{Simple: ${in.header.CamelAwsS3Key} contains 'score_input'}] ] [ 1]
[route1 ] [process1 ] [Processor@0x7aa3628c ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalArgumentException: File may not be null
at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
2020-08-02 21:34:57,419 [ws-s3://ds_test] WARN S3Consumer - Exchange failed, so rolling back message status: Exchange[ID-XXXXXX-Mac-49195-1596384290692-0-6]
java.lang.IllegalArgumentException: File may not be null
at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
from("timer:CSVReader?period=10s")
.pollEnrich("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&fileName=")
.setHeader("CamelAwsS3ContentType", constant("text/csv"))
.log(LoggingLevel.INFO, "RESPONSE Headers ${headers}").end()
.log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
.log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
.filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
.unmarshal().gzip()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
String filename = exchange.getIn().getHeader(S3Constants.KEY, String.class);
String body = exchange.getIn().getBody(String.class);
ContentBody cd = new InputStreamBody(new ByteArrayInputStream(body.getBytes()),ContentType.MULTIPART_FORM_DATA, "temp.csv");
multipartEntityBuilder.addPart("file", cd);
exchange.getOut().setBody(multipartEntityBuilder.build());
}
})
.to(httpRoute
当执行 Camel AWS S3 消费者从 S3 读取文件并将它们传递到 Rest End 点失败时,因为文件主体变为空。我使用类似的代码从 sftp 使用文件,它有效。但是当我使用 Aws 端点时,它失败了。 exchange s3对象还有其他设置吗?
当使用来自 s3 的文件并将其传递给 Rest 时,文件主体变为空 API。
代码:
from("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&exchangePattern=InOut")
.convertBodyTo(byte[].class)
.log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
.log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
.filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
// .to("file:target/s3out?fileName=${in.header.CamelAwsS3Key}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
File file = exchange.getIn().getBody(File.class);
multipartEntityBuilder.addPart("file",
new FileBody(file, ContentType.MULTIPART_FORM_DATA, filename));
exchange.getOut().setBody(multipartEntityBuilder.build());
}
})
.to(httpRoute)
错误:
2020-08-02 21:34:57,413 [ws-s3://ds_test] INFO consuming - Consumer Fired!
2020-08-02 21:34:57,414 [ws-s3://ds_test] INFO route1 - Replay Message Sent to file:s3out input_0.csv.gz
2020-08-02 21:34:57,415 [ws-s3://ds_test] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID-XXXXXX-Mac-49195-1596384290692-0-5 on ExchangeId: ID-XXXXXXX-Mac-49195-1596384290692-0-6). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: File may not be null
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [aws-s3://test?amazonS3Client=%23amazonS3Client&autocloseBody=false&delay=50] [ 218]
[route1 ] [convertBodyTo1 ] [convertBodyTo[byte[]] ] [ 216]
[route1 ] [log1 ] [log ] [ 0]
[route1 ] [log2 ] [log ] [ 1]
[route1 ] [filter1 ] [filter[simple{Simple: ${in.header.CamelAwsS3Key} contains 'score_input'}] ] [ 1]
[route1 ] [process1 ] [Processor@0x7aa3628c ] [ 0]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalArgumentException: File may not be null
at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
2020-08-02 21:34:57,419 [ws-s3://ds_test] WARN S3Consumer - Exchange failed, so rolling back message status: Exchange[ID-XXXXXX-Mac-49195-1596384290692-0-6]
java.lang.IllegalArgumentException: File may not be null
at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
from("timer:CSVReader?period=10s")
.pollEnrich("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&fileName=")
.setHeader("CamelAwsS3ContentType", constant("text/csv"))
.log(LoggingLevel.INFO, "RESPONSE Headers ${headers}").end()
.log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
.log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
.filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
.unmarshal().gzip()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
String filename = exchange.getIn().getHeader(S3Constants.KEY, String.class);
String body = exchange.getIn().getBody(String.class);
ContentBody cd = new InputStreamBody(new ByteArrayInputStream(body.getBytes()),ContentType.MULTIPART_FORM_DATA, "temp.csv");
multipartEntityBuilder.addPart("file", cd);
exchange.getOut().setBody(multipartEntityBuilder.build());
}
})
.to(httpRoute