创建 DAG 数据流 (apache Beam)

Create DAG Dataflow (apache Beam)

我正在 Dataflow (Apache beam) 上创建一个管道以在 Google BigQuery 上读取和写入数据,但是我在创建 DAG 时遇到了问题,就像我在 Airflow 中所做的那样。

这是我的代码中的一个示例:

# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps 
                                    | 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )

我希望按顺序执行这些任务,而 Dataflow 会并行执行它们

如何让它们按顺序执行?

我假设您正在像这样从大查询中读取数据:

count = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input_table))

我深入研究了 apache_beam 源代码,看起来他们的源代码转换忽略了输入 pcollection,这就是他们并行设置的原因。

def expand(self, pbegin):的最后一行:

class Read(ptransform.PTransform):
  """A transform that reads a PCollection."""

  def __init__(self, source):
    """Initializes a Read transform.

    Args:
      source: Data source to read from.
    """
    super(Read, self).__init__()
    self.source = source

  def expand(self, pbegin):
    from apache_beam.options.pipeline_options import DebugOptions
    from apache_beam.transforms import util

    assert isinstance(pbegin, pvalue.PBegin)
    self.pipeline = pbegin.pipeline

    debug_options = self.pipeline._options.view_as(DebugOptions)
    if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
      source = self.source

      def split_source(unused_impulse):
        total_size = source.estimate_size()
        if total_size:
          # 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
          chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
        else:
          chunk_size = 64 << 20  # 64mb
        return source.split(chunk_size)

      return (
          pbegin
          | core.Impulse()
          | 'Split' >> core.FlatMap(split_source)
          | util.Reshuffle()
          | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
              split.source.get_range_tracker(
                  split.start_position, split.stop_position))))
    else:
      # Treat Read itself as a primitive.
      return pvalue.PCollection(self.pipeline)

# ... other methods

看起来如果您设置此实验性 beam_fn_api 管道 debug_option 那么 pbegin 将实际使用,但我不确定该选项的其他影响是什么。

为什么需要它们按顺序发生?您似乎是在写一个 table 然后从另一个读?

如果你真的需要它按顺序发生,也许可以像这样子类化它 Read 就可以了

class SequentialRead(Read):
  def expand(self, pbegin):
      return pbegin

由于您希望将中间步骤输出到 BigQuery 并在两个转换之间流动数据,我认为 Branch 可以实现您想要的结果。

PCollection_1 =(从 BQ 读取).apply(Transform_1)

PCollection_1.申请(写到BQ)

PCollection_1.apply(Transform_2).apply(写入BQ)

这将允许您在元素经过 Transform_1 之后应用 Transform_2 并将该中间步骤写入 BQ。通过对同一个 PCollection 应用多个 ParDo,您可以在 DAG 中生成不同的分支。