Apache Beam Python ReadFromText 正则表达式
Apache Beam Python ReadFromText Regex
我有一种情况,我想从 GCS 读取给定两天的数据。我的文件夹结构是 sensors/<date>/<hash>/x.csv.gz
,我希望能够读取“20171104”和“20171105”的文件。使用正则表达式 sensors/[20171104,20171105]/<hash>/*
不起作用。有谁知道使用 beam.io.ReadFromText 函数处理此问题的最佳方法?
我已经想出如何在不使用通配符的情况下读取预期的数据天数,而是通过编写 python 函数。这个想法是创建一个包含所有读取操作的数组,然后展平该数组并将其用作管道的输入。
def read_files(pipeline, intended_day):
collections = []
previous_day = (datetime.strptime(intended_day, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
days = [intended_day, previous_day]
path = "gs://sensors/{}/<hash>/*"
for day in days:
try:
file_name = path.format(day)
collection = pipeline | ('Read Past for %s' % day) >> beam.io.ReadFromText(file_name)
collections.append(collection)
except IOError:
logging.error("Failed to read for day %s" % day)
return collections
然后像这样在您的管道中调用您的函数:
p = beam.Pipeline(runner=runner, argv=argv)
intended_day = "20170810"
pcollections = read_files(p, intended_day)
result = ((pcollections | "Flatten sensor" >> beam.Flatten())
| .....
)
我有一种情况,我想从 GCS 读取给定两天的数据。我的文件夹结构是 sensors/<date>/<hash>/x.csv.gz
,我希望能够读取“20171104”和“20171105”的文件。使用正则表达式 sensors/[20171104,20171105]/<hash>/*
不起作用。有谁知道使用 beam.io.ReadFromText 函数处理此问题的最佳方法?
我已经想出如何在不使用通配符的情况下读取预期的数据天数,而是通过编写 python 函数。这个想法是创建一个包含所有读取操作的数组,然后展平该数组并将其用作管道的输入。
def read_files(pipeline, intended_day):
collections = []
previous_day = (datetime.strptime(intended_day, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d')
days = [intended_day, previous_day]
path = "gs://sensors/{}/<hash>/*"
for day in days:
try:
file_name = path.format(day)
collection = pipeline | ('Read Past for %s' % day) >> beam.io.ReadFromText(file_name)
collections.append(collection)
except IOError:
logging.error("Failed to read for day %s" % day)
return collections
然后像这样在您的管道中调用您的函数:
p = beam.Pipeline(runner=runner, argv=argv)
intended_day = "20170810"
pcollections = read_files(p, intended_day)
result = ((pcollections | "Flatten sensor" >> beam.Flatten())
| .....
)