骆驼:AWS-S3:文件主体 null ,当使用来自 s3 的文件并传递给 Rest api

Camel: AWS-S3: File body null , when consuming file from s3 and passing to Rest api

当执行 Camel AWS S3 消费者从 S3 读取文件并将它们传递到 Rest End 点失败时,因为文件主体变为空。我使用类似的代码从 sftp 使用文件,它有效。但是当我使用 Aws 端点时,它失败了。 exchange s3对象还有其他设置吗?

当使用来自 s3 的文件并将其传递给 Rest 时,文件主体变为空 API。

代码:

from("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&exchangePattern=InOut")
                   .convertBodyTo(byte[].class)
                    .log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
                    .log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
                    .filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
                   // .to("file:target/s3out?fileName=${in.header.CamelAwsS3Key}")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
                            String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
                            File file = exchange.getIn().getBody(File.class);
                            multipartEntityBuilder.addPart("file",
                                    new FileBody(file, ContentType.MULTIPART_FORM_DATA, filename));
                            exchange.getOut().setBody(multipartEntityBuilder.build());
                        }
                    })
                    .to(httpRoute)

错误:

2020-08-02 21:34:57,413 [ws-s3://ds_test] INFO  consuming                      - Consumer Fired!
2020-08-02 21:34:57,414 [ws-s3://ds_test] INFO  route1                         - Replay Message Sent to file:s3out input_0.csv.gz
2020-08-02 21:34:57,415 [ws-s3://ds_test] ERROR DefaultErrorHandler            - Failed delivery for (MessageId: ID-XXXXXX-Mac-49195-1596384290692-0-5 on ExchangeId: ID-XXXXXXX-Mac-49195-1596384290692-0-6). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: File may not be null

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [aws-s3://test?amazonS3Client=%23amazonS3Client&autocloseBody=false&delay=50] [       218]
[route1            ] [convertBodyTo1    ] [convertBodyTo[byte[]]                                                         ] [       216]
[route1            ] [log1              ] [log                                                                           ] [         0]
[route1            ] [log2              ] [log                                                                           ] [         1]
[route1            ] [filter1           ] [filter[simple{Simple: ${in.header.CamelAwsS3Key} contains 'score_input'}]     ] [         1]
[route1            ] [process1          ] [Processor@0x7aa3628c                                                          ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalArgumentException: File may not be null
        at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
        at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
        at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
        at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:830) [?:?]
2020-08-02 21:34:57,419 [ws-s3://ds_test] WARN  S3Consumer                     - Exchange failed, so rolling back message status: Exchange[ID-XXXXXX-Mac-49195-1596384290692-0-6]
java.lang.IllegalArgumentException: File may not be null
        at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
        at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
        at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route.process(Application.java:101) ~[classes/:?]
        at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:830) [?:?]
from("timer:CSVReader?period=10s")
          
                    
                    .pollEnrich("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&fileName=")
                   
                         
                            .setHeader("CamelAwsS3ContentType", constant("text/csv"))
                    .log(LoggingLevel.INFO, "RESPONSE Headers ${headers}").end()
                    .log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
                  
                    .log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
                    .filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
                 
                            .unmarshal().gzip()
                            .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
                                    String filename = exchange.getIn().getHeader(S3Constants.KEY, String.class);
                                    String body = exchange.getIn().getBody(String.class);
                                    ContentBody cd = new InputStreamBody(new ByteArrayInputStream(body.getBytes()),ContentType.MULTIPART_FORM_DATA, "temp.csv");
                                    multipartEntityBuilder.addPart("file",  cd);
                                    exchange.getOut().setBody(multipartEntityBuilder.build());
                                }
                            })
                    .to(httpRoute