如何根据 header 值 "group by" 消息

how to "group by" messages based on a header value

我正在尝试根据遵循此标准的文件扩展名创建一个文件 zip:filename.{NUMBER},我正在做的是读取一个文件夹,按 .{number} 分组,然后创建一个独特的 .zip 文件,最后带有 .num,例如:

文件夹/

file.01

file2.01

file.02

file2.02

文件夹 -> /已处理

file.01.zip which contains -> file.01, file2.01

file02.zip which contains -> file.02, file2.02

我所做的是使用 outboundGateway,拆分文件,丰富 headers 读取文件扩展名,然后聚合读取 header,但似乎无法正常工作。

public IntegrationFlow integrationFlow() {
return flow
.handle(Ftp.outboundGateway(FTPServers.PC_LOCAL.getFactory(), AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
                .fileExistsMode(FileExistsMode.REPLACE)
                .filterFunction(ftpFile -> {
                    int extensionIndex = ftpFile.getName().indexOf(".");
                    return extensionIndex != -1 && ftpFile.getName().substring(extensionIndex).matches("\.([0-9]*)");
                })
                .localDirectory(new File("/tmp")))
            .split() //receiving an iterator, creates a message for each file
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression("warehouseId", "payload.getName().substring(payload.getName().indexOf('.') +1)"))
            .aggregate(aggregatorSpec -> aggregatorSpec.correlationExpression("headers['warehouseId']"))
            .transform(new ZipTransformer())
            .log(message -> {
                log.info(message.getHeaders().toString());
                return message;
            });
}

它给我一条包含所有文件的消息,我应该会收到 2 条消息。

由于这个 dsl 的性质,我有一个动态数量的文件,所以我无法计算以相同数字结尾的消息(文件),我认为超时不是一个好的发布策略,我只是自己写的代码,没有写入磁盘:


.<List<File>, List<Message<ByteArrayOutputStream>>>transform(files -> {
                HashMap<String, ZipOutputStream> zipOutputStreamHashMap = new HashMap<>();
                HashMap<String, ByteArrayOutputStream> zipByteArrayMap = new HashMap<>();
                ArrayList<Message<ByteArrayOutputStream>> messageList = new ArrayList<>();
                files.forEach(file -> {
                    String warehouseId = file.getName().substring(file.getName().indexOf('.') + 1);
                    ZipOutputStream warehouseStream = zipOutputStreamHashMap.computeIfAbsent(warehouseId, s -> new ZipOutputStream(zipByteArrayMap.computeIfAbsent(s, s1 -> new ByteArrayOutputStream())));
                    try {
                        warehouseStream.putNextEntry(new ZipEntry(file.getName()));
                        FileInputStream inputStream = new FileInputStream(file);
                        byte[] bytes = new byte[4096];
                        int length;
                        while ((length = inputStream.read(bytes)) >= 0) {
                            warehouseStream.write(bytes, 0, length);
                        }
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
                zipOutputStreamHashMap.forEach((s, zipOutputStream) -> {
                    try {
                        zipOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
                zipByteArrayMap.forEach((key, byteArrayOutputStream) -> {
                    messageList.add(MessageBuilder.withPayload(byteArrayOutputStream).setHeader("warehouseId", key).build());
                });

                return messageList;
            })
            .split()
            .transform(ByteArrayOutputStream::toByteArray)
            .handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
            ......