无法从 HDFS 复制到 S3A
Can't copy from HDFS to S3A
我有一个 class 可以使用 Apache FileUtil:
将目录内容从一个位置复制到另一个位置
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
class Folder {
private final FileSystem fs;
private final Path pth;
// ... constructors and other methods
/**
* Copy contents (files and files in subfolders) to another folder.
* Merges overlapping folders
* Overwrites already existing files
* @param destination Folder where content will be moved to
* @throws IOException If fails
*/
public void copyFilesTo(final Folder destination) throws IOException {
final RemoteIterator<LocatedFileStatus> iter = this.fs.listFiles(
this.pth,
true
);
final URI root = this.pth.toUri();
while (iter.hasNext()) {
final Path source = iter.next().getPath();
FileUtil.copy(
this.fs,
source,
destination.fs,
new Path(
destination.pth,
root.relativize(source.toUri()).toString()
),
false,
true,
this.fs.getConf()
);
}
}
}
这个 class 在单元测试中与本地 (file:///
) 目录一起工作正常,
但是当我试图在 Hadoop 集群中使用它来将文件从 HDFS (hdfs:///tmp/result
) 复制到 Amazon S3 (s3a://mybucket/out
) 它不会复制任何东西也不会抛出错误,只是默默地跳过正在复制。
当我将相同的 class(HDFS 或 S3a 文件系统)用于其他目的时,它工作正常,因此这里的配置和 fs
参考应该没问题。
我做错了什么?如何正确的从HDFS复制文件到S3A?
我正在使用 Hadoop 2.7.3
。
更新
我在 copyFilesTo
方法中添加了更多日志以记录 root
、source
和 target
变量(并在不更改代码的情况下提取了 rebase()
方法):
/**
* Copy contents (files and files in subfolders) to another folder.
* Merges overlapping folders
* Overwrites already existing files
* @param dst Folder where content will be moved to
* @throws IOException If fails
*/
public void copyFilesTo(final Folder dst) throws IOException {
Logger.info(
this, "copyFilesTo(%s): from %s fs=%s",
dst, this, this.hdfs
);
final RemoteIterator<LocatedFileStatus> iter = this.hdfs.listFiles(
this.pth,
true
);
final URI root = this.pth.toUri();
Logger.info(this, "copyFilesTo(%s): root=%s", dst, root);
while (iter.hasNext()) {
final Path source = iter.next().getPath();
final Path target = Folder.rebase(dst.path(), this.path(), source);
Logger.info(
this, "copyFilesTo(%s): src=%s target=%s",
dst, source, target
);
FileUtil.copy(
this.hdfs,
source,
dst.hdfs,
target,
false,
true,
this.hdfs.getConf()
);
}
}
/**
* Change the base of target URI to new base, using root
* as common path.
* @param base New base
* @param root Common root
* @param target Target to rebase
* @return Path with new base
*/
static Path rebase(final Path base, final Path root, final Path target) {
return new Path(
base, root.toUri().relativize(target.toUri()).toString()
);
}
在集群中 运行 之后,我得到了这些日志:
io.Folder: copyFilesTo(hdfs:///tmp/_dst): from hdfs:///tmp/_src fs=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_182008924_1, ugi=hadoop (auth:SIMPLE)]]
io.Folder: copyFilesTo(hdfs:///tmp/_dst): root=hdfs:///tmp/_src
INFO io.Folder: copyFilesTo(hdfs:///tmp/_dst): src=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file target=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file
我在 rebase()
方法中定位了错误的代码,在 EMR 集群中 运行 时它无法正常工作,因为 RemoteIterator
返回远程格式的 URI:hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file
但是此方法需要 hdfs:///tmp/_src/one.file
格式,这就是它在本地使用 file:///
FS.
的原因
我看不出有什么明显的错误。
- 它是做 hdfs-hdfs 还是 s3a-s3a?
- 升级你的hadoop版本; 2.7.x 已经过时了,尤其是 S3A 代码。不太可能解决这个问题,但可以避免其他问题。升级后,切换到 fast upload,它将对大文件进行增量更新;目前您的代码会将每个文件保存到 /tmp 某处,然后在 close() 调用中上传它。
- 打开 org.apache.hadoop.fs.s3a 模块的日志记录,看看它说了什么
我不确定这是最好的和完全正确的解决方案,但它对我有用。这个想法是在变基之前修复本地路径的主机和端口,工作 rebase
方法将是:
/**
* Change the base of target URI to new base, using root
* as common path.
* @param base New base
* @param root Common root
* @param target Target to rebase
* @return Path with new base
* @throws IOException If fails
*/
@SuppressWarnings("PMD.DefaultPackage")
static Path rebase(final Path base, final Path root, final Path target)
throws IOException {
final URI uri = target.toUri();
try {
return new Path(
new Path(
new URIBuilder(base.toUri())
.setHost(uri.getHost())
.setPort(uri.getPort())
.build()
),
new Path(
new URIBuilder(root.toUri())
.setHost(uri.getHost())
.setPort(uri.getPort())
.build()
.relativize(uri)
)
);
} catch (final URISyntaxException err) {
throw new IOException("Failed to rebase", err);
}
}
我有一个 class 可以使用 Apache FileUtil:
将目录内容从一个位置复制到另一个位置import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
class Folder {
private final FileSystem fs;
private final Path pth;
// ... constructors and other methods
/**
* Copy contents (files and files in subfolders) to another folder.
* Merges overlapping folders
* Overwrites already existing files
* @param destination Folder where content will be moved to
* @throws IOException If fails
*/
public void copyFilesTo(final Folder destination) throws IOException {
final RemoteIterator<LocatedFileStatus> iter = this.fs.listFiles(
this.pth,
true
);
final URI root = this.pth.toUri();
while (iter.hasNext()) {
final Path source = iter.next().getPath();
FileUtil.copy(
this.fs,
source,
destination.fs,
new Path(
destination.pth,
root.relativize(source.toUri()).toString()
),
false,
true,
this.fs.getConf()
);
}
}
}
这个 class 在单元测试中与本地 (file:///
) 目录一起工作正常,
但是当我试图在 Hadoop 集群中使用它来将文件从 HDFS (hdfs:///tmp/result
) 复制到 Amazon S3 (s3a://mybucket/out
) 它不会复制任何东西也不会抛出错误,只是默默地跳过正在复制。
当我将相同的 class(HDFS 或 S3a 文件系统)用于其他目的时,它工作正常,因此这里的配置和 fs
参考应该没问题。
我做错了什么?如何正确的从HDFS复制文件到S3A?
我正在使用 Hadoop 2.7.3
。
更新
我在 copyFilesTo
方法中添加了更多日志以记录 root
、source
和 target
变量(并在不更改代码的情况下提取了 rebase()
方法):
/**
* Copy contents (files and files in subfolders) to another folder.
* Merges overlapping folders
* Overwrites already existing files
* @param dst Folder where content will be moved to
* @throws IOException If fails
*/
public void copyFilesTo(final Folder dst) throws IOException {
Logger.info(
this, "copyFilesTo(%s): from %s fs=%s",
dst, this, this.hdfs
);
final RemoteIterator<LocatedFileStatus> iter = this.hdfs.listFiles(
this.pth,
true
);
final URI root = this.pth.toUri();
Logger.info(this, "copyFilesTo(%s): root=%s", dst, root);
while (iter.hasNext()) {
final Path source = iter.next().getPath();
final Path target = Folder.rebase(dst.path(), this.path(), source);
Logger.info(
this, "copyFilesTo(%s): src=%s target=%s",
dst, source, target
);
FileUtil.copy(
this.hdfs,
source,
dst.hdfs,
target,
false,
true,
this.hdfs.getConf()
);
}
}
/**
* Change the base of target URI to new base, using root
* as common path.
* @param base New base
* @param root Common root
* @param target Target to rebase
* @return Path with new base
*/
static Path rebase(final Path base, final Path root, final Path target) {
return new Path(
base, root.toUri().relativize(target.toUri()).toString()
);
}
在集群中 运行 之后,我得到了这些日志:
io.Folder: copyFilesTo(hdfs:///tmp/_dst): from hdfs:///tmp/_src fs=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_182008924_1, ugi=hadoop (auth:SIMPLE)]]
io.Folder: copyFilesTo(hdfs:///tmp/_dst): root=hdfs:///tmp/_src
INFO io.Folder: copyFilesTo(hdfs:///tmp/_dst): src=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file target=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file
我在 rebase()
方法中定位了错误的代码,在 EMR 集群中 运行 时它无法正常工作,因为 RemoteIterator
返回远程格式的 URI:hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file
但是此方法需要 hdfs:///tmp/_src/one.file
格式,这就是它在本地使用 file:///
FS.
我看不出有什么明显的错误。
- 它是做 hdfs-hdfs 还是 s3a-s3a?
- 升级你的hadoop版本; 2.7.x 已经过时了,尤其是 S3A 代码。不太可能解决这个问题,但可以避免其他问题。升级后,切换到 fast upload,它将对大文件进行增量更新;目前您的代码会将每个文件保存到 /tmp 某处,然后在 close() 调用中上传它。
- 打开 org.apache.hadoop.fs.s3a 模块的日志记录,看看它说了什么
我不确定这是最好的和完全正确的解决方案,但它对我有用。这个想法是在变基之前修复本地路径的主机和端口,工作 rebase
方法将是:
/**
* Change the base of target URI to new base, using root
* as common path.
* @param base New base
* @param root Common root
* @param target Target to rebase
* @return Path with new base
* @throws IOException If fails
*/
@SuppressWarnings("PMD.DefaultPackage")
static Path rebase(final Path base, final Path root, final Path target)
throws IOException {
final URI uri = target.toUri();
try {
return new Path(
new Path(
new URIBuilder(base.toUri())
.setHost(uri.getHost())
.setPort(uri.getPort())
.build()
),
new Path(
new URIBuilder(root.toUri())
.setHost(uri.getHost())
.setPort(uri.getPort())
.build()
.relativize(uri)
)
);
} catch (final URISyntaxException err) {
throw new IOException("Failed to rebase", err);
}
}