旁加载静态数据
Sideload static data
在 ParDo 中处理我的数据时,我需要使用存储在 Google 云存储上的 JSON 架构。我认为这可能是侧载?我阅读了他们称为文档 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html) 的页面,其中包含有关 apache_beam.pvalue.AsSingleton
和 apache_beam.pvalue.AsSideInput
的内容,但如果我 Google 了解它们的用法,则结果为零,我可以找不到 Python.
的任何示例
如何从 ParDo 中的存储读取文件?或者我是否在 ParDo 之前侧载到我的管道,但我如何在 ParDo 中利用这第二个源?
[编辑]
我的主要数据来自BQ:beam.io.Read(beam.io.BigQuerySource(...
side input也来自BQ,同样使用BigQuerySource
。
当我在主数据端输入其他数据后添加一个步骤时,出现了一些奇怪的错误。我注意到,当我对侧面输入执行 beam.Map(lambda x: x)
时,它会起作用。
侧输入
schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
| beam.Map(lambda x: x)
)
主要数据
source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))
合并
validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))
我发现了一个类似的问题here。就此 post 评论而言,如果您的架构文件(在本例中为 JSON)位于 GCS 中的已知位置,您可以将 ParDo
添加到直接从中读取的管道GCS 使用 start_bundle()
实现。
如果您需要抽象出用于存储架构文件(不仅仅是 GCS)的文件系统,则可以使用 Beam 的 FileSystem
抽象。
此外,您可以使用 Google Cloud Storage’s API.
从存储中 read/download 文件
我还找到了 here 一篇博客,其中讨论了使用 Google Cloud Dataflow 时不同的源阅读模式。
希望对您有所帮助。
我会使用您提到的文档作为库参考,并阅读 Beam 编程指南以获得更详细的演练:side input section。我将尝试提供几个示例,其中我们将从 public table 下载 BigQuery 架构并将其上传到 GCS:
bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
gsutil cp schema.json gs://$BUCKET
我们的数据将是一些没有 headers 的 csv 行,因此我们必须使用 GCS 模式:
data = [('NC', 'F', 2020, 'Hello', 3200),
('NC', 'F', 2020, 'World', 3180)]
使用侧输入
我们将 JSON 文件读入 schema
PCollection:
schema = (p
| 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))
然后我们将它作为辅助输入传递给 ParDo
,以便它被广播给执行 DoFn
的每个工作人员。在这种情况下,我们可以使用 AsSingleton
因为我们只想将模式作为单个值提供:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们可以在EnrichElementsFn
的process
方法中访问schema
:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def process(self, element, schema):
field_names = [x['name'] for x in json.loads(schema)]
yield zip(field_names, element)
请注意,最好在将其保存为单例之前进行架构处理(构造 field_names
)以避免重复工作,但这只是一个说明性示例。
使用启动包
在这种情况下,我们不会将任何额外的输入传递给 ParDo
:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们使用 Python 客户端库(我们需要安装 google-cloud-storage
)在每次工作人员初始化包时读取模式:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def start_bundle(self):
from google.cloud import storage
client = storage.Client()
blob = client.get_bucket(BUCKET).get_blob('schema.json')
self.schema = blob.download_as_string()
def process(self, element):
field_names = [x['name'] for x in json.loads(self.schema)]
yield zip(field_names, element)
两种情况下的输出相同:
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]
使用 2.16.0 SDK 和 DirectRunner
测试。
两个示例的完整代码here。
在 ParDo 中处理我的数据时,我需要使用存储在 Google 云存储上的 JSON 架构。我认为这可能是侧载?我阅读了他们称为文档 (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html) 的页面,其中包含有关 apache_beam.pvalue.AsSingleton
和 apache_beam.pvalue.AsSideInput
的内容,但如果我 Google 了解它们的用法,则结果为零,我可以找不到 Python.
如何从 ParDo 中的存储读取文件?或者我是否在 ParDo 之前侧载到我的管道,但我如何在 ParDo 中利用这第二个源?
[编辑]
我的主要数据来自BQ:beam.io.Read(beam.io.BigQuerySource(...
side input也来自BQ,同样使用BigQuerySource
。
当我在主数据端输入其他数据后添加一个步骤时,出现了一些奇怪的错误。我注意到,当我对侧面输入执行 beam.Map(lambda x: x)
时,它会起作用。
侧输入
schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
| beam.Map(lambda x: x)
)
主要数据
source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))
合并
validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))
我发现了一个类似的问题here。就此 post 评论而言,如果您的架构文件(在本例中为 JSON)位于 GCS 中的已知位置,您可以将 ParDo
添加到直接从中读取的管道GCS 使用 start_bundle()
实现。
如果您需要抽象出用于存储架构文件(不仅仅是 GCS)的文件系统,则可以使用 Beam 的 FileSystem
抽象。
此外,您可以使用 Google Cloud Storage’s API.
从存储中 read/download 文件我还找到了 here 一篇博客,其中讨论了使用 Google Cloud Dataflow 时不同的源阅读模式。
希望对您有所帮助。
我会使用您提到的文档作为库参考,并阅读 Beam 编程指南以获得更详细的演练:side input section。我将尝试提供几个示例,其中我们将从 public table 下载 BigQuery 架构并将其上传到 GCS:
bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
gsutil cp schema.json gs://$BUCKET
我们的数据将是一些没有 headers 的 csv 行,因此我们必须使用 GCS 模式:
data = [('NC', 'F', 2020, 'Hello', 3200),
('NC', 'F', 2020, 'World', 3180)]
使用侧输入
我们将 JSON 文件读入 schema
PCollection:
schema = (p
| 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))
然后我们将它作为辅助输入传递给 ParDo
,以便它被广播给执行 DoFn
的每个工作人员。在这种情况下,我们可以使用 AsSingleton
因为我们只想将模式作为单个值提供:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们可以在EnrichElementsFn
的process
方法中访问schema
:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def process(self, element, schema):
field_names = [x['name'] for x in json.loads(schema)]
yield zip(field_names, element)
请注意,最好在将其保存为单例之前进行架构处理(构造 field_names
)以避免重复工作,但这只是一个说明性示例。
使用启动包
在这种情况下,我们不会将任何额外的输入传递给 ParDo
:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们使用 Python 客户端库(我们需要安装 google-cloud-storage
)在每次工作人员初始化包时读取模式:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def start_bundle(self):
from google.cloud import storage
client = storage.Client()
blob = client.get_bucket(BUCKET).get_blob('schema.json')
self.schema = blob.download_as_string()
def process(self, element):
field_names = [x['name'] for x in json.loads(self.schema)]
yield zip(field_names, element)
两种情况下的输出相同:
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]
使用 2.16.0 SDK 和 DirectRunner
测试。
两个示例的完整代码here。