在spark中saveAsTextFile时如何命名文件?

How to name file when saveAsTextFile in spark?

在 spark 版本 1.5.1 中保存为文本文件时,我使用:rdd.saveAsTextFile('<drectory>')

但是如果我想在该目录中找到该文件,我该如何命名它呢?

目前,我认为它被命名为part-00000,这一定是一些默认的。我该如何给它起个名字?

正如我在上面的评论中所说,可以找到带有示例的文档 here。并引用方法的描述saveAsTextFile:

Save this RDD as a text file, using string representations of elements.

在下面的示例中,我将一个简单的 RDD 保存到一个文件中,然后加载它并打印其内容。

samples = sc.parallelize([
    ("abonsanto@fakemail.com", "Alberto", "Bonsanto"),
    ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
    ("stranger@fakemail.com", "Stranger", "Weirdo"),
    ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])

print samples.collect()

samples.saveAsTextFile("folder/here.txt")
read_rdd = sc.textFile("folder/here.txt")

read_rdd.collect()

输出将是

('abonsanto@fakemail.com', 'Alberto', 'Bonsanto')
('mbonsanto@fakemail.com', 'Miguel', 'Bonsanto')
('stranger@fakemail.com', 'Stranger', 'Weirdo')
('dbonsanto@fakemail.com', 'Dakota', 'Bonsanto')

[u"('abonsanto@fakemail.com', 'Alberto', 'Bonsanto')",
 u"('mbonsanto@fakemail.com', 'Miguel', 'Bonsanto')",
 u"('stranger@fakemail.com', 'Stranger', 'Weirdo')",
 u"('dbonsanto@fakemail.com', 'Dakota', 'Bonsanto')"]

让我们看看使用基于 Unix 的终端。

usr@host:~/folder/here.txt$ cat *
('abonsanto@fakemail.com', 'Alberto', 'Bonsanto')
('mbonsanto@fakemail.com', 'Miguel', 'Bonsanto')
('stranger@fakemail.com', 'Stranger', 'Weirdo')
('dbonsanto@fakemail.com', 'Dakota', 'Bonsanto')

这个问题的正确答案是 saveAsTextFile 不允许您命名实际文件。

这样做的原因是数据是分区的,并且在作为调用 saveAsTextFile(...) 的参数给出的路径中,它会将其视为一个目录,然后每个分区写入一个文件。

您可以调用 rdd.coalesce(1).saveAsTextFile('/some/path/somewhere'),它将创建 /some/path/somewhere/part-0000.txt

如果您需要比这更多的控制,您需要在执行 rdd.collect().

之后在您的终端执行实际的文件操作

注意,这会将所有数据拉入一个执行程序,因此您可能 运行 会遇到内存问题。这就是你承担的风险。

无法像@nod 所说的那样命名文件。但是,可以在之后立即重命名文件。使用 PySpark 的示例:

sc._jsc.hadoopConfiguration().set(
    "mapred.output.committer.class",
    "org.apache.hadoop.mapred.FileOutputCommitter")
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("s3://{bucket_name}"), sc._jsc.hadoopConfiguration())
file_path = "s3://{bucket_name}/processed/source={source_name}/year={partition_year}/week={partition_week}/"
# remove data already stored if necessary
fs.delete(Path(file_path))

df.saveAsTextFile(file_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

# rename created file
created_file_path = fs.globStatus(Path(file_path + "part*.gz"))[0].getPath()
fs.rename(
    created_file_path,
    Path(file_path + "{desired_name}.jl.gz"))