创建 DAG 数据流 (apache Beam)
Create DAG Dataflow (apache Beam)
我正在 Dataflow (Apache beam) 上创建一个管道以在 Google BigQuery 上读取和写入数据,但是我在创建 DAG 时遇到了问题,就像我在 Airflow 中所做的那样。
这是我的代码中的一个示例:
# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps
| 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )
我希望按顺序执行这些任务,而 Dataflow 会并行执行它们
如何让它们按顺序执行?
我假设您正在像这样从大查询中读取数据:
count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))
我深入研究了 apache_beam 源代码,看起来他们的源代码转换忽略了输入 pcollection,这就是他们并行设置的原因。
见def expand(self, pbegin):
的最后一行:
class Read(ptransform.PTransform):
"""A transform that reads a PCollection."""
def __init__(self, source):
"""Initializes a Read transform.
Args:
source: Data source to read from.
"""
super(Read, self).__init__()
self.source = source
def expand(self, pbegin):
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.transforms import util
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
debug_options = self.pipeline._options.view_as(DebugOptions)
if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
source = self.source
def split_source(unused_impulse):
total_size = source.estimate_size()
if total_size:
# 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
else:
chunk_size = 64 << 20 # 64mb
return source.split(chunk_size)
return (
pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(split_source)
| util.Reshuffle()
| 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
split.source.get_range_tracker(
split.start_position, split.stop_position))))
else:
# Treat Read itself as a primitive.
return pvalue.PCollection(self.pipeline)
# ... other methods
看起来如果您设置此实验性 beam_fn_api
管道 debug_option 那么 pbegin
将实际使用,但我不确定该选项的其他影响是什么。
为什么需要它们按顺序发生?您似乎是在写一个 table 然后从另一个读?
如果你真的需要它按顺序发生,也许可以像这样子类化它 Read
就可以了
class SequentialRead(Read):
def expand(self, pbegin):
return pbegin
由于您希望将中间步骤输出到 BigQuery 并在两个转换之间流动数据,我认为 Branch 可以实现您想要的结果。
PCollection_1 =(从 BQ 读取).apply(Transform_1)
PCollection_1.申请(写到BQ)
PCollection_1.apply(Transform_2).apply(写入BQ)
这将允许您在元素经过 Transform_1 之后应用 Transform_2 并将该中间步骤写入 BQ。通过对同一个 PCollection 应用多个 ParDo,您可以在 DAG 中生成不同的分支。
我正在 Dataflow (Apache beam) 上创建一个管道以在 Google BigQuery 上读取和写入数据,但是我在创建 DAG 时遇到了问题,就像我在 Airflow 中所做的那样。
这是我的代码中的一个示例:
# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps
| 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )
我希望按顺序执行这些任务,而 Dataflow 会并行执行它们
如何让它们按顺序执行?
我假设您正在像这样从大查询中读取数据:
count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))
我深入研究了 apache_beam 源代码,看起来他们的源代码转换忽略了输入 pcollection,这就是他们并行设置的原因。
见def expand(self, pbegin):
的最后一行:
class Read(ptransform.PTransform):
"""A transform that reads a PCollection."""
def __init__(self, source):
"""Initializes a Read transform.
Args:
source: Data source to read from.
"""
super(Read, self).__init__()
self.source = source
def expand(self, pbegin):
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.transforms import util
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
debug_options = self.pipeline._options.view_as(DebugOptions)
if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
source = self.source
def split_source(unused_impulse):
total_size = source.estimate_size()
if total_size:
# 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
else:
chunk_size = 64 << 20 # 64mb
return source.split(chunk_size)
return (
pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(split_source)
| util.Reshuffle()
| 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
split.source.get_range_tracker(
split.start_position, split.stop_position))))
else:
# Treat Read itself as a primitive.
return pvalue.PCollection(self.pipeline)
# ... other methods
看起来如果您设置此实验性 beam_fn_api
管道 debug_option 那么 pbegin
将实际使用,但我不确定该选项的其他影响是什么。
为什么需要它们按顺序发生?您似乎是在写一个 table 然后从另一个读?
如果你真的需要它按顺序发生,也许可以像这样子类化它 Read
就可以了
class SequentialRead(Read):
def expand(self, pbegin):
return pbegin
由于您希望将中间步骤输出到 BigQuery 并在两个转换之间流动数据,我认为 Branch 可以实现您想要的结果。
PCollection_1 =(从 BQ 读取).apply(Transform_1)
PCollection_1.申请(写到BQ)
PCollection_1.apply(Transform_2).apply(写入BQ)
这将允许您在元素经过 Transform_1 之后应用 Transform_2 并将该中间步骤写入 BQ。通过对同一个 PCollection 应用多个 ParDo,您可以在 DAG 中生成不同的分支。