在 Google 数据流模板中将 'date' 作为运行时参数传入

Passing in 'date' as a runtime argument in Google Dataflow Template

我目前正在尝试生成一个 Google 数据流自定义模板,它将在 运行 时调用 API,并将结果写入 BigQuery table .

但是我遇到的问题是 API 需要传入日期参数​​ 'YYYY-MM-DD' 才能正常工作。

不幸的是,在构建模板时,Dataflow 似乎要求您使用 ValueProvider (as described here) for any variables that are relative to when the job is being run (i.e. today's date). Otherwise it'll just carry on using the same date that was generated when the template was originally created. (i.e. with dt.date.today() etc - h/t to this post)

因此,对于我得到的代码,有什么方法可以生成模板,以便它可以在 运行 时间正确地使用今天的日期作为参数,而不是仅仅使用相同的静态日期无限期地 - 或目前的情况 - 只是根本不转换为模板。

from __future__ import print_function, absolute_import
import argparse
import logging
import sys

import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions
from apache_beam.options.value_provider import ValueProvider

import datetime as dt
from datetime import timedelta, date
import time
import re

logging.getLogger().setLevel(logging.INFO)

class GetAPI():
  def __init__(self, data={}, date=None):
    self.num_api_errors = Metrics.counter(self.__class__, 'num_api_errors')
    self.data = data
    self.date = date

  def get_job(self):
    import requests
    endpoint = f'https://www.rankranger.com/api/v2/?rank_stats&key={self.data.api_key}&date={self.date}'\
               f'&campaign_id={self.data.campaign}&se_id={self.data.se}&domain={self.data.domain}&output=json'
    logging.info("Endpoint: {}".format(str(endpoint)))
    try:
      res = requests.get(endpoint)
      if res.status_code == 200:
        # logging.info("Reponse: {}".format(str(res.text)))
        json_data = res.json()
        ## Store the API response
        if 'result' in json_data:
          response = json_data.get('result')
          return response

    except Exception as e:
      self.num_api_errors.inc()
      logging.error(f'Exception: {e}')
      logging.error(f'Extract error on "%s"', 'Rank API')


def format_dates(api):
  api['date'] = dt.datetime.strptime(api['date'], "%m/%d/%Y").strftime("%Y-%m-%d")
  return api


# Class to pass in date generated at runtime to template
class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        ## Special runtime argument e.g. date
        parser.add_value_provider_argument('--date',
            type=str,
            default=(dt.date.today()).strftime("%Y-%m-%d"),
            help='Run date in YYYY-MM-DD format.')


def run(argv=None):
    """
      Main entry point; defines the static arguments to be passed in.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--api_key',
        type=str,
        default=API_KEY,
        help='API key for Rank API.')
    parser.add_argument('--campaign',
        type=str,
        default=CAMPAIGN,
        help='Campaign ID for Rank API')
    parser.add_argument('--se',
        type=str,
        default=SE,
        help='Search Engine ID for Rank API')
    parser.add_argument('--domain',
        type=str,
        default=DOMAIN,
        help='Domain for Rank API')
    parser.add_argument('--dataset',
        type=str,
        default=DATASET,
        help='BigQuery Dataset to write tables to. Must already exist.')
    parser.add_argument('--table_name',
        type=str,
        default=TABLE_NAME,
        help='The BigQuery table name. Should not already exist.')
    parser.add_argument('--project',
        type=str,
        default=PROJECT,
        help='Your GCS project.')
    parser.add_argument('--runner',
        type=str,
        default="DataflowRunner",
        help='Type of DataFlow runner.')

    args, pipeline_args = parser.parse_known_args(argv)

    # Create and set your PipelineOptions.
    options = PipelineOptions(pipeline_args)
    user_options = options.view_as(UserOptions)

    pipeline = beam.Pipeline(options=options)

    # Gets data from Rank Ranger API
    api = (
        pipeline
        | 'create' >> beam.Create(GetAPI(data=args, date=user_options.date).get_job())
        | 'format dates' >> beam.Map(format_dates)
    )

    # Write to bigquery based on specified schema
    BQ = (api | "WriteToBigQuery" >> beam.io.WriteToBigQuery(args.table_name, args.dataset, SCHEMA))

    pipeline.run()


if __name__ == '__main__':
    run()

正如您从错误消息中看到的那样,它没有传递格式整齐的 'YYYY-MM-DD' 参数,而是传递了完整的 ValueProvider 对象,该对象停止了 API 调用的工作并返回NoneType 错误。

(Apache) C:\Users\user.name\Documents\Alchemy\Dataflow\production_pipeline\templates>python main.py --runner DataflowRunner --project <PROJECT> --staging_location gs://<STORAGE-BUCKET>/staging --temp_location gs://<STORAGE-BUCKET>/temp --template_location gs://<STORAGE-BUCKET>/template/<TEMPLATE> --region europe-west2
INFO:root:Endpoint: https://www.rankranger.com/api/v2/?rank_stats&key=<API_KEY>&date=RuntimeValueProvider(option: date, type: str, default_value: '2020-08-25')&campaign_id=<CAMPAIGN>&se_id=<SE>&domain=<DOMAIN>&output=json
Traceback (most recent call last):
  File "main.py", line 267, in <module>
    run()
  File "main.py", line 257, in run
    | 'format dates' >> beam.Map(format_dates)
  File "C:\Users\user.name\Anaconda3\envs\Apache\lib\site-packages\apache_beam\transforms\core.py", line 2590, in __init__
    self.values = tuple(values)
TypeError: 'NoneType' object is not iterable

任何帮助将不胜感激!

您的诊断是正确的。您应该考虑迁移到 Flex Templates,它可以解决这个(和其他)问题并提供更大的灵活性。