如何自动化 BigQuery SQL 管道

How to automate a BigQuery SQL pipeline

我使用 BigQuery SQL 创建了一个数据管道。 它首先从 Cloud Storage 导入 CSV 文件,然后进行不同的分析,包括使用 BigQueryML 的预测建模 使用地理函数的地理计算,以及 使用分析函数计算 KPI。

我能够手动成功运行不同的查询,现在我想自动化数据管道。

我的第一选择是 DataFlow SQL 但事实证明 Dataflow SQL 查询语法不支持地理函数。

DataFlow python 不是一个选项,因为完整的分析是在 SQL 中完成的,我想保持这种方式。

我的问题是还有哪些其他 GCP 选项可用于自动化数据管道。

BigQuery 有一个内置的调度机制,目前处于测试阶段。

要自动化 BQ 本机 SQL 管道,您可以使用此实用程序。 使用 CLI:

$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'

正如我在评论中提到的,如果您需要编排查询,您可以使用 Cloud Composer,一个完全托管的 Airflow 集群。

我创建了下面的代码,或多或少地向您展示了如何使用此工具编排您的查询。请注意,这是一个基本代码,可以根据编码标准进行改进。 该代码基本上编排了 3 个查询:

  1. 第一个从 public table 读取并写入您项目中的另一个 table
  2. 第二个读取第一个查询中创建的 table 和 select 基于日期列的 10000 个最新行。之后,它将结果保存到项目中的 table。
  3. 第三个读取步骤2中创建的table并计算一些聚合。之后,它会将结果保存到您项目中的另一个 table。

    import datetime
    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
    """The condiguration presented below will run your DAG every five minutes as specified in the 
    schedule_interval property starting from the datetime specified in the start_date property"""
    
    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
    
       # Define DAG dependencies.
        run_first_query >> run_second_query >> run_third_query
    

一步一步来:

  • 首先,它导入了一些 Airflow 库,例如模型和 bigquery_operator

    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
  • 然后它定义了一个名为 default_dag_args 的字典,将在您创建 DAG 时进一步使用。

    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
  • 当您创建 DAG 时,您将 default_dag_args 字典作为默认参数传递并添加 schedule interval 参数,该参数将定义您的 DAG 何时应为 运行.您可以将此参数与一些预设表达式一起使用或使用 CRON 表达式,如您所见 here

    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
  • 之后,您可以创建您的操作员实例。在这种情况下,我们仅使用 BigQueryOperator

        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
  • 作为最后一步,我们可以定义 DAG 的依赖项。这段代码意味着 run_second_query 操作取决于 run_first_query 的结论,所以它是。

        run_first_query >> run_second_query >> run_third_query
    

最后,我想添加这个 article,讨论如何在使用 CRON 表达式时正确设置 start_date 和 schedule_interval。