如何使用 java - preferred java8 解压多线程的大文件夹?
How can I unzip huge folder with multithreading with java - preferred java8?
参考:
http://www.pixeldonor.com/2013/oct/12/concurrent-zip-compression-java-nio/
我正在尝试解压 5GB 的压缩文件,平均需要大约 30 分钟,这对我们的应用程序来说很长,我正在努力减少时间。
我尝试了很多组合,更改了缓冲区大小(默认情况下我的写入块为 4096 字节),更改了 NIO 方法、库,所有结果都非常相同。
还有一件事没有尝试过,就是将压缩文件按块分割,所以用多线程块读取它。
代码段是:
private static ExecutorService e = Executors.newFixedThreadPool(20);
public static void main(String argv[]) {
try {
String selectedZipFile = "/Users/xx/Documents/test123/large.zip";
String selectedDirectory = "/Users/xx/Documents/test2";
long st = System.currentTimeMillis();
unzip(selectedDirectory, selectedZipFile);
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void unzip(String targetDir, String zipFilename) {
ZipInputStream archive;
try {
List<ZipEntry> list = new ArrayList<>();
archive = new ZipInputStream(new BufferedInputStream(new FileInputStream(zipFilename)));
ZipEntry entry;
while ((entry = archive.getNextEntry()) != null) {
list.add(entry);
}
for (List<ZipEntry> partition : Lists.partition(list, 1000)) {
e.submit(new Multi(targetDir, partition, archive));
}
} catch (Exception e){
e.printStackTrace();
}
}
可运行的是:
static class Multi implements Runnable {
private List<ZipEntry> partition;
private ZipInputStream zipInputStream;
private String targetDir;
public Multi(String targetDir, List<ZipEntry> partition, ZipInputStream zipInputStream) {
this.partition = partition;
this.zipInputStream = zipInputStream;
this.targetDir = targetDir;
}
@Override
public void run() {
for (ZipEntry entry : partition) {
File entryDestination = new File(targetDir, entry.getName());
if (entry.isDirectory()) {
entryDestination.mkdirs();
} else {
entryDestination.getParentFile().mkdirs();
BufferedOutputStream output = null;
try {
int n;
byte buf[] = new byte[BUFSIZE];
output = new BufferedOutputStream(new FileOutputStream(entryDestination), BUFSIZE);
while ((n = zipInputStream.read(buf, 0, BUFSIZE)) != -1) {
output.write(buf, 0, n);
}
output.flush();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
} finally {
try {
output.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
但出于某种原因它只存储没有文件内容的目录...
我的问题是:关于上述 "compression" 文章的方法,在大型 zip 文件上使用多线程制作块的正确方法是什么?
一个ZipInputStream
是一个数据流,不能拆分。
如果要多线程解压,需要使用ZipFile
。使用 Java 8,您甚至可以免费获得多线程。
public static void unzip(String targetDir, String zipFilename) {
Path targetDirPath = Paths.get(targetDir);
try (ZipFile zipFile = new ZipFile(zipFilename)) {
zipFile.stream()
.parallel() // enable multi-threading
.forEach(e -> unzipEntry(zipFile, e, targetDirPath));
} catch (IOException e) {
throw new RuntimeException("Error opening zip file '" + zipFilename + "': " + e, e);
}
}
private static void unzipEntry(ZipFile zipFile, ZipEntry entry, Path targetDir) {
try {
Path targetPath = targetDir.resolve(Paths.get(entry.getName()));
if (Files.isDirectory(targetPath)) {
Files.createDirectories(targetPath);
} else {
Files.createDirectories(targetPath.getParent());
try (InputStream in = zipFile.getInputStream(entry)) {
Files.copy(in, targetPath, StandardCopyOption.REPLACE_EXISTING);
}
}
} catch (IOException e) {
throw new RuntimeException("Error processing zip entry '" + entry.getName() + "': " + e, e);
}
}
您可能还想查看 this answer, which uses FileSystem
以访问 zip 文件内容,以获得真正的 Java 8 体验。
这里是利用 FileSystem
的并行版本。你应该稍微调整一下(例如实际使用流媒体,添加错误处理)。但这应该是一个不错的开始。
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class ParallelUnzip {
static class UnzipVisitor extends SimpleFileVisitor<Path> {
private Consumer<Path> unzipper;
public UnzipVisitor(Consumer<Path> unzipper) {
this.unzipper = unzipper;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file)) {
unzipper.accept(file);
}
return FileVisitResult.CONTINUE;
}
}
// I would not risk creating directories in parallel, so adding synchronized here
synchronized static void createDirectories(Path path) throws IOException {
if (!Files.exists(path.getParent())) {
Files.createDirectories(path.getParent());
}
}
public static void main(String[] args) throws IOException, InterruptedException {
FileSystem fs = FileSystems.newFileSystem(URI.create("jar:file:/tests.zip"), new HashMap<>());
Path root = fs.getRootDirectories().iterator().next();
Path target = Paths.get("target");
ExecutorService executor = Executors.newFixedThreadPool(2);
Files.walkFileTree(root, new UnzipVisitor((path) -> {
System.out.println(Thread.currentThread().getName() + " " + path.toAbsolutePath().toString());
executor.submit(() -> {
try {
Path t = target.resolve(path.toString().substring(1));
createDirectories(t);
System.out.println("Extracting with thread " + Thread.currentThread().getName() + " File: "
+ path.toAbsolutePath().toString() + " -> " + t.toAbsolutePath().toString());
// Should be using streaming here
byte[] bytes = Files.readAllBytes(path);
Files.write(t, bytes);
} catch (Exception ioe) {
ioe.printStackTrace();
throw new RuntimeException(ioe);
}
});
}));
executor.shutdown();
executor.awaitTermination(1000, TimeUnit.SECONDS);
}
}
参考: http://www.pixeldonor.com/2013/oct/12/concurrent-zip-compression-java-nio/
我正在尝试解压 5GB 的压缩文件,平均需要大约 30 分钟,这对我们的应用程序来说很长,我正在努力减少时间。
我尝试了很多组合,更改了缓冲区大小(默认情况下我的写入块为 4096 字节),更改了 NIO 方法、库,所有结果都非常相同。
还有一件事没有尝试过,就是将压缩文件按块分割,所以用多线程块读取它。
代码段是:
private static ExecutorService e = Executors.newFixedThreadPool(20);
public static void main(String argv[]) {
try {
String selectedZipFile = "/Users/xx/Documents/test123/large.zip";
String selectedDirectory = "/Users/xx/Documents/test2";
long st = System.currentTimeMillis();
unzip(selectedDirectory, selectedZipFile);
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void unzip(String targetDir, String zipFilename) {
ZipInputStream archive;
try {
List<ZipEntry> list = new ArrayList<>();
archive = new ZipInputStream(new BufferedInputStream(new FileInputStream(zipFilename)));
ZipEntry entry;
while ((entry = archive.getNextEntry()) != null) {
list.add(entry);
}
for (List<ZipEntry> partition : Lists.partition(list, 1000)) {
e.submit(new Multi(targetDir, partition, archive));
}
} catch (Exception e){
e.printStackTrace();
}
}
可运行的是:
static class Multi implements Runnable {
private List<ZipEntry> partition;
private ZipInputStream zipInputStream;
private String targetDir;
public Multi(String targetDir, List<ZipEntry> partition, ZipInputStream zipInputStream) {
this.partition = partition;
this.zipInputStream = zipInputStream;
this.targetDir = targetDir;
}
@Override
public void run() {
for (ZipEntry entry : partition) {
File entryDestination = new File(targetDir, entry.getName());
if (entry.isDirectory()) {
entryDestination.mkdirs();
} else {
entryDestination.getParentFile().mkdirs();
BufferedOutputStream output = null;
try {
int n;
byte buf[] = new byte[BUFSIZE];
output = new BufferedOutputStream(new FileOutputStream(entryDestination), BUFSIZE);
while ((n = zipInputStream.read(buf, 0, BUFSIZE)) != -1) {
output.write(buf, 0, n);
}
output.flush();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
} finally {
try {
output.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
但出于某种原因它只存储没有文件内容的目录...
我的问题是:关于上述 "compression" 文章的方法,在大型 zip 文件上使用多线程制作块的正确方法是什么?
一个ZipInputStream
是一个数据流,不能拆分。
如果要多线程解压,需要使用ZipFile
。使用 Java 8,您甚至可以免费获得多线程。
public static void unzip(String targetDir, String zipFilename) {
Path targetDirPath = Paths.get(targetDir);
try (ZipFile zipFile = new ZipFile(zipFilename)) {
zipFile.stream()
.parallel() // enable multi-threading
.forEach(e -> unzipEntry(zipFile, e, targetDirPath));
} catch (IOException e) {
throw new RuntimeException("Error opening zip file '" + zipFilename + "': " + e, e);
}
}
private static void unzipEntry(ZipFile zipFile, ZipEntry entry, Path targetDir) {
try {
Path targetPath = targetDir.resolve(Paths.get(entry.getName()));
if (Files.isDirectory(targetPath)) {
Files.createDirectories(targetPath);
} else {
Files.createDirectories(targetPath.getParent());
try (InputStream in = zipFile.getInputStream(entry)) {
Files.copy(in, targetPath, StandardCopyOption.REPLACE_EXISTING);
}
}
} catch (IOException e) {
throw new RuntimeException("Error processing zip entry '" + entry.getName() + "': " + e, e);
}
}
您可能还想查看 this answer, which uses FileSystem
以访问 zip 文件内容,以获得真正的 Java 8 体验。
这里是利用 FileSystem
的并行版本。你应该稍微调整一下(例如实际使用流媒体,添加错误处理)。但这应该是一个不错的开始。
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class ParallelUnzip {
static class UnzipVisitor extends SimpleFileVisitor<Path> {
private Consumer<Path> unzipper;
public UnzipVisitor(Consumer<Path> unzipper) {
this.unzipper = unzipper;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file)) {
unzipper.accept(file);
}
return FileVisitResult.CONTINUE;
}
}
// I would not risk creating directories in parallel, so adding synchronized here
synchronized static void createDirectories(Path path) throws IOException {
if (!Files.exists(path.getParent())) {
Files.createDirectories(path.getParent());
}
}
public static void main(String[] args) throws IOException, InterruptedException {
FileSystem fs = FileSystems.newFileSystem(URI.create("jar:file:/tests.zip"), new HashMap<>());
Path root = fs.getRootDirectories().iterator().next();
Path target = Paths.get("target");
ExecutorService executor = Executors.newFixedThreadPool(2);
Files.walkFileTree(root, new UnzipVisitor((path) -> {
System.out.println(Thread.currentThread().getName() + " " + path.toAbsolutePath().toString());
executor.submit(() -> {
try {
Path t = target.resolve(path.toString().substring(1));
createDirectories(t);
System.out.println("Extracting with thread " + Thread.currentThread().getName() + " File: "
+ path.toAbsolutePath().toString() + " -> " + t.toAbsolutePath().toString());
// Should be using streaming here
byte[] bytes = Files.readAllBytes(path);
Files.write(t, bytes);
} catch (Exception ioe) {
ioe.printStackTrace();
throw new RuntimeException(ioe);
}
});
}));
executor.shutdown();
executor.awaitTermination(1000, TimeUnit.SECONDS);
}
}