使用数据流在 gcs 存储桶上按大小列出文件夹

List folders by size on gcs bucket with dataflow

查看 问题的代码,我希望能够创建一个数据流管道,该管道可以查看特定 gcs 存储桶文件夹中的所有文件,并说明最终子目录的数量最多以字节为单位的数据。我会编写类似于 :

的代码
class SortFiles(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes > 0:
      # Sort the files here? 


class SortFolders(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes > 0:
      # Sort the folders here based on maximum addition of a combination 
      # of the file sizes and file numbers 


def delete_empty_files():

    options = PipelineOptions(...)

    gfs = gcs.GCSFileSystem(pipeline_options)
    p = beam.Pipeline(options=pipeline_options)

    discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).metadata_list)
                        | 'Reshuffle' >> beam.Reshuffle() 
                        | 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
                        | 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
                        | 'OutputFolders' >> ...

我还没有决定是按字节总数还是文件夹中的文件总数来列出文件夹。我将如何解决这个问题?另一个问题是我希望能够找到最终的子目录而不是它的父文件夹来完成这个任务。

GCSFileSystem 有一个函数,du 会告诉你特定路径下的大小。 https://gcsfs.readthedocs.io/en/latest/api.html?highlight=du#gcsfs.core.GCSFileSystem

在阅读你的问题时我认为你想要

  1. 首先找到桶中所有本身不包含目录的目录(如果我理解'final subdirectories')
  2. 然后 运行 du 每个,
  3. 然后根据大小
  4. 对结果列表进行排序

如果您要对嵌套的文件进行计数:

  1. 列出所有对象,名称为 a/, a/b.txt, a/b/c.txt, 等等
  2. 写一个函数统计每个子路径下嵌套的对象