如何使用 DataprocHiveOperator 从 Hive 作业输出日志中提取查询结果?

How to extract the query result from a Hive job output logs using DataprocHiveOperator?

我正在尝试使用 Airflow 构建数据迁移管道,源是 Dataproc 集群上的 Hive table,目标是 BigQuery。我正在使用 DataprocHiveOperator 从源中获取架构和数据。此运算符在内部使用 Dataproc REST API 在我们指定的 Dataproc 集群上提交和执行作业。输出将作为作业日志的一部分写入文件 Google 云存储。我只需要这些日志的查询结果。

截至目前,我已将 gcp_dataproc_hook.py 代码修改为 return 调用方法的输出,方法是在 的帮助下将输出文件的内容下载为字符串driverOutputResourceUri 参数。此输出的 return 类型是 Pandas 数据框(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。

这是我在 gcp_dataproc_hook.py 添加到 return 提交查询的输出日志的代码片段:

    #download the output
    def getOutput(self,project, output_bucket,output_path):
        client = storage.Client(project=self.project_id)
        bucket = client.get_bucket(output_bucket)
        output_blob = ('/'.join(output_path)+"."+"000000000")
        return bucket.blob(output_blob).download_as_string()

    #get logs including query output
    def getQueryResult(self):
        result=self.job_ouput
        output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
        df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
        return df

这是我尝试执行的示例查询:

SHOW CREATE TABLE my_tbl;

输出日志如下所示:

Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0            Connected to: Apache Hive (version 2.3.5)             
1                    Driver: Hive JDBC (version 2.3.5)             
2    Transaction isolation: TRANSACTION_REPEATABLE_...             
3    . . . . . . . . . . . . . . . . . . . . . . .>...             
4    |                   createtab_stmt            ...             
5    +---------------------------------------------...             
6    | CREATE TABLE `my_tbl`(       ...             
7    |   `col1` string,            ...             
8    |   `col2` bigint,                   ...             
9    |   `col3` string,                 ...                         
..                                                 ...                         
141  |   `coln` string)                 ...             
142  | ROW FORMAT SERDE                            ...             
143  |   'org.apache.hadoop.hive.ql.io.orc.OrcSerde...             
144  | STORED AS INPUTFORMAT                       ...             
145  |   'org.apache.hadoop.hive.ql.io.orc.OrcInput...             
146  | OUTPUTFORMAT                                ...             
147  |   'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...             
148  | LOCATION                                    ...             
149  |   'gs://my_hive_data_bucket/tmp/base_table/my_tbl...             
150  | TBLPROPERTIES (                             ...             
151  |   'transient_lastDdlTime'='1566842329')     ...             
152  +---------------------------------------------...             
153                  143 rows selected (0.154 seconds)             
154               Beeline version 2.3.5 by Apache Hive             
155  Closing: 0: jdbc:hive2://prod-metastore-test-c... 

预期的输出应该是这样的:

CREATE TABLE `my_tbl`(
  `col1` string,
  `col2` bigint,
  `col3` string,
  ..
  `coln` string,  
)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
  'transient_lastDdlTime'='1566842329')

请建议我接近解决方案的方法。

在 Dataproc 中,Hive 查询使用 Beeline 而不是已弃用的 Hive CLI,这就是默认情况下格式不同的原因。 Beeline 通常会将人类可读的输出格式化为花哨的边框格式,而不是更容易解析的格式。

幸运的是,有一些直线选项可以使格式非常接近旧的 Hive CLI 所做的。您可以简单地创建一个初始化操作,在创建 Dataproc 集群时将其添加到选项中,并在您的 Airflow 运算符中指定 init_actions_uris。创建一个包含以下内容的文件:

#!/bin/bash

sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline

并将该文件上传到 GCS,例如 gs://some-gcs-bucket/beeline-legacyfmt.sh 并将该 GCS URI 设置为 Dataproc 集群的初始化操作。这将应用默认情况下直线所需的命令行选项。然后,您发送的任何 Dataproc Hive 作业现在将以 "tsv2" 和 "silent" 模式输出,这意味着没有无关的日志语句,输出将是原始 tsv。

迟到的报告,对于发现这个问题的人:

我最近经历了这个过程,发现使这个工作可靠的唯一方法是将查询的输出写入存储文件夹,然后从文件夹中的文件中读回。示例:

INSERT OVERWITE DIRECTORY "gs://bucket/path/"
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY "\t" 
LINES TERMINATED BY "\n"
SELECT ...

从作业日志中读取和解析并不是一个可靠的设计,并且格式在我部署第一个版本后几个月发生了变化。作业日志是给人看的,不是给电脑看的