如何根据 header 值 "group by" 消息
how to "group by" messages based on a header value
我正在尝试根据遵循此标准的文件扩展名创建一个文件 zip:filename.{NUMBER},我正在做的是读取一个文件夹,按 .{number} 分组,然后创建一个独特的 .zip 文件,最后带有 .num,例如:
文件夹/
file.01
file2.01
file.02
file2.02
文件夹 -> /已处理
file.01.zip which contains -> file.01, file2.01
file02.zip which contains -> file.02, file2.02
我所做的是使用 outboundGateway,拆分文件,丰富 headers 读取文件扩展名,然后聚合读取 header,但似乎无法正常工作。
public IntegrationFlow integrationFlow() {
return flow
.handle(Ftp.outboundGateway(FTPServers.PC_LOCAL.getFactory(), AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.fileExistsMode(FileExistsMode.REPLACE)
.filterFunction(ftpFile -> {
int extensionIndex = ftpFile.getName().indexOf(".");
return extensionIndex != -1 && ftpFile.getName().substring(extensionIndex).matches("\.([0-9]*)");
})
.localDirectory(new File("/tmp")))
.split() //receiving an iterator, creates a message for each file
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression("warehouseId", "payload.getName().substring(payload.getName().indexOf('.') +1)"))
.aggregate(aggregatorSpec -> aggregatorSpec.correlationExpression("headers['warehouseId']"))
.transform(new ZipTransformer())
.log(message -> {
log.info(message.getHeaders().toString());
return message;
});
}
它给我一条包含所有文件的消息,我应该会收到 2 条消息。
由于这个 dsl 的性质,我有一个动态数量的文件,所以我无法计算以相同数字结尾的消息(文件),我认为超时不是一个好的发布策略,我只是自己写的代码,没有写入磁盘:
.<List<File>, List<Message<ByteArrayOutputStream>>>transform(files -> {
HashMap<String, ZipOutputStream> zipOutputStreamHashMap = new HashMap<>();
HashMap<String, ByteArrayOutputStream> zipByteArrayMap = new HashMap<>();
ArrayList<Message<ByteArrayOutputStream>> messageList = new ArrayList<>();
files.forEach(file -> {
String warehouseId = file.getName().substring(file.getName().indexOf('.') + 1);
ZipOutputStream warehouseStream = zipOutputStreamHashMap.computeIfAbsent(warehouseId, s -> new ZipOutputStream(zipByteArrayMap.computeIfAbsent(s, s1 -> new ByteArrayOutputStream())));
try {
warehouseStream.putNextEntry(new ZipEntry(file.getName()));
FileInputStream inputStream = new FileInputStream(file);
byte[] bytes = new byte[4096];
int length;
while ((length = inputStream.read(bytes)) >= 0) {
warehouseStream.write(bytes, 0, length);
}
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipOutputStreamHashMap.forEach((s, zipOutputStream) -> {
try {
zipOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipByteArrayMap.forEach((key, byteArrayOutputStream) -> {
messageList.add(MessageBuilder.withPayload(byteArrayOutputStream).setHeader("warehouseId", key).build());
});
return messageList;
})
.split()
.transform(ByteArrayOutputStream::toByteArray)
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
......
我正在尝试根据遵循此标准的文件扩展名创建一个文件 zip:filename.{NUMBER},我正在做的是读取一个文件夹,按 .{number} 分组,然后创建一个独特的 .zip 文件,最后带有 .num,例如:
文件夹/
file.01
file2.01
file.02
file2.02
文件夹 -> /已处理
file.01.zip which contains -> file.01, file2.01
file02.zip which contains -> file.02, file2.02
我所做的是使用 outboundGateway,拆分文件,丰富 headers 读取文件扩展名,然后聚合读取 header,但似乎无法正常工作。
public IntegrationFlow integrationFlow() {
return flow
.handle(Ftp.outboundGateway(FTPServers.PC_LOCAL.getFactory(), AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.fileExistsMode(FileExistsMode.REPLACE)
.filterFunction(ftpFile -> {
int extensionIndex = ftpFile.getName().indexOf(".");
return extensionIndex != -1 && ftpFile.getName().substring(extensionIndex).matches("\.([0-9]*)");
})
.localDirectory(new File("/tmp")))
.split() //receiving an iterator, creates a message for each file
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression("warehouseId", "payload.getName().substring(payload.getName().indexOf('.') +1)"))
.aggregate(aggregatorSpec -> aggregatorSpec.correlationExpression("headers['warehouseId']"))
.transform(new ZipTransformer())
.log(message -> {
log.info(message.getHeaders().toString());
return message;
});
}
它给我一条包含所有文件的消息,我应该会收到 2 条消息。
由于这个 dsl 的性质,我有一个动态数量的文件,所以我无法计算以相同数字结尾的消息(文件),我认为超时不是一个好的发布策略,我只是自己写的代码,没有写入磁盘:
.<List<File>, List<Message<ByteArrayOutputStream>>>transform(files -> {
HashMap<String, ZipOutputStream> zipOutputStreamHashMap = new HashMap<>();
HashMap<String, ByteArrayOutputStream> zipByteArrayMap = new HashMap<>();
ArrayList<Message<ByteArrayOutputStream>> messageList = new ArrayList<>();
files.forEach(file -> {
String warehouseId = file.getName().substring(file.getName().indexOf('.') + 1);
ZipOutputStream warehouseStream = zipOutputStreamHashMap.computeIfAbsent(warehouseId, s -> new ZipOutputStream(zipByteArrayMap.computeIfAbsent(s, s1 -> new ByteArrayOutputStream())));
try {
warehouseStream.putNextEntry(new ZipEntry(file.getName()));
FileInputStream inputStream = new FileInputStream(file);
byte[] bytes = new byte[4096];
int length;
while ((length = inputStream.read(bytes)) >= 0) {
warehouseStream.write(bytes, 0, length);
}
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipOutputStreamHashMap.forEach((s, zipOutputStream) -> {
try {
zipOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
});
zipByteArrayMap.forEach((key, byteArrayOutputStream) -> {
messageList.add(MessageBuilder.withPayload(byteArrayOutputStream).setHeader("warehouseId", key).build());
});
return messageList;
})
.split()
.transform(ByteArrayOutputStream::toByteArray)
.handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
......