如何使用 DataprocHiveOperator 从 Hive 作业输出日志中提取查询结果?
How to extract the query result from a Hive job output logs using DataprocHiveOperator?
我正在尝试使用 Airflow 构建数据迁移管道,源是 Dataproc 集群上的 Hive table,目标是 BigQuery。我正在使用 DataprocHiveOperator 从源中获取架构和数据。此运算符在内部使用 Dataproc REST API 在我们指定的 Dataproc 集群上提交和执行作业。输出将作为作业日志的一部分写入文件 Google 云存储。我只需要这些日志的查询结果。
截至目前,我已将 gcp_dataproc_hook.py 代码修改为 return 调用方法的输出,方法是在 的帮助下将输出文件的内容下载为字符串driverOutputResourceUri 参数。此输出的 return 类型是 Pandas 数据框(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。
这是我在 gcp_dataproc_hook.py 添加到 return 提交查询的输出日志的代码片段:
#download the output
def getOutput(self,project, output_bucket,output_path):
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(output_bucket)
output_blob = ('/'.join(output_path)+"."+"000000000")
return bucket.blob(output_blob).download_as_string()
#get logs including query output
def getQueryResult(self):
result=self.job_ouput
output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
return df
这是我尝试执行的示例查询:
SHOW CREATE TABLE my_tbl;
输出日志如下所示:
Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0 Connected to: Apache Hive (version 2.3.5)
1 Driver: Hive JDBC (version 2.3.5)
2 Transaction isolation: TRANSACTION_REPEATABLE_...
3 . . . . . . . . . . . . . . . . . . . . . . .>...
4 | createtab_stmt ...
5 +---------------------------------------------...
6 | CREATE TABLE `my_tbl`( ...
7 | `col1` string, ...
8 | `col2` bigint, ...
9 | `col3` string, ...
.. ...
141 | `coln` string) ...
142 | ROW FORMAT SERDE ...
143 | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde...
144 | STORED AS INPUTFORMAT ...
145 | 'org.apache.hadoop.hive.ql.io.orc.OrcInput...
146 | OUTPUTFORMAT ...
147 | 'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...
148 | LOCATION ...
149 | 'gs://my_hive_data_bucket/tmp/base_table/my_tbl...
150 | TBLPROPERTIES ( ...
151 | 'transient_lastDdlTime'='1566842329') ...
152 +---------------------------------------------...
153 143 rows selected (0.154 seconds)
154 Beeline version 2.3.5 by Apache Hive
155 Closing: 0: jdbc:hive2://prod-metastore-test-c...
预期的输出应该是这样的:
CREATE TABLE `my_tbl`(
`col1` string,
`col2` bigint,
`col3` string,
..
`coln` string,
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
'transient_lastDdlTime'='1566842329')
请建议我接近解决方案的方法。
在 Dataproc 中,Hive 查询使用 Beeline 而不是已弃用的 Hive CLI,这就是默认情况下格式不同的原因。 Beeline 通常会将人类可读的输出格式化为花哨的边框格式,而不是更容易解析的格式。
幸运的是,有一些直线选项可以使格式非常接近旧的 Hive CLI 所做的。您可以简单地创建一个初始化操作,在创建 Dataproc 集群时将其添加到选项中,并在您的 Airflow 运算符中指定 init_actions_uris。创建一个包含以下内容的文件:
#!/bin/bash
sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline
并将该文件上传到 GCS,例如 gs://some-gcs-bucket/beeline-legacyfmt.sh
并将该 GCS URI 设置为 Dataproc 集群的初始化操作。这将应用默认情况下直线所需的命令行选项。然后,您发送的任何 Dataproc Hive 作业现在将以 "tsv2" 和 "silent" 模式输出,这意味着没有无关的日志语句,输出将是原始 tsv。
迟到的报告,对于发现这个问题的人:
我最近经历了这个过程,发现使这个工作可靠的唯一方法是将查询的输出写入存储文件夹,然后从文件夹中的文件中读回。示例:
INSERT OVERWITE DIRECTORY "gs://bucket/path/"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
SELECT ...
从作业日志中读取和解析并不是一个可靠的设计,并且格式在我部署第一个版本后几个月发生了变化。作业日志是给人看的,不是给电脑看的
我正在尝试使用 Airflow 构建数据迁移管道,源是 Dataproc 集群上的 Hive table,目标是 BigQuery。我正在使用 DataprocHiveOperator 从源中获取架构和数据。此运算符在内部使用 Dataproc REST API 在我们指定的 Dataproc 集群上提交和执行作业。输出将作为作业日志的一部分写入文件 Google 云存储。我只需要这些日志的查询结果。
截至目前,我已将 gcp_dataproc_hook.py 代码修改为 return 调用方法的输出,方法是在 的帮助下将输出文件的内容下载为字符串driverOutputResourceUri 参数。此输出的 return 类型是 Pandas 数据框(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。
这是我在 gcp_dataproc_hook.py 添加到 return 提交查询的输出日志的代码片段:
#download the output
def getOutput(self,project, output_bucket,output_path):
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(output_bucket)
output_blob = ('/'.join(output_path)+"."+"000000000")
return bucket.blob(output_blob).download_as_string()
#get logs including query output
def getQueryResult(self):
result=self.job_ouput
output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
return df
这是我尝试执行的示例查询:
SHOW CREATE TABLE my_tbl;
输出日志如下所示:
Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0 Connected to: Apache Hive (version 2.3.5)
1 Driver: Hive JDBC (version 2.3.5)
2 Transaction isolation: TRANSACTION_REPEATABLE_...
3 . . . . . . . . . . . . . . . . . . . . . . .>...
4 | createtab_stmt ...
5 +---------------------------------------------...
6 | CREATE TABLE `my_tbl`( ...
7 | `col1` string, ...
8 | `col2` bigint, ...
9 | `col3` string, ...
.. ...
141 | `coln` string) ...
142 | ROW FORMAT SERDE ...
143 | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde...
144 | STORED AS INPUTFORMAT ...
145 | 'org.apache.hadoop.hive.ql.io.orc.OrcInput...
146 | OUTPUTFORMAT ...
147 | 'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...
148 | LOCATION ...
149 | 'gs://my_hive_data_bucket/tmp/base_table/my_tbl...
150 | TBLPROPERTIES ( ...
151 | 'transient_lastDdlTime'='1566842329') ...
152 +---------------------------------------------...
153 143 rows selected (0.154 seconds)
154 Beeline version 2.3.5 by Apache Hive
155 Closing: 0: jdbc:hive2://prod-metastore-test-c...
预期的输出应该是这样的:
CREATE TABLE `my_tbl`(
`col1` string,
`col2` bigint,
`col3` string,
..
`coln` string,
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
'transient_lastDdlTime'='1566842329')
请建议我接近解决方案的方法。
在 Dataproc 中,Hive 查询使用 Beeline 而不是已弃用的 Hive CLI,这就是默认情况下格式不同的原因。 Beeline 通常会将人类可读的输出格式化为花哨的边框格式,而不是更容易解析的格式。
幸运的是,有一些直线选项可以使格式非常接近旧的 Hive CLI 所做的。您可以简单地创建一个初始化操作,在创建 Dataproc 集群时将其添加到选项中,并在您的 Airflow 运算符中指定 init_actions_uris。创建一个包含以下内容的文件:
#!/bin/bash
sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline
并将该文件上传到 GCS,例如 gs://some-gcs-bucket/beeline-legacyfmt.sh
并将该 GCS URI 设置为 Dataproc 集群的初始化操作。这将应用默认情况下直线所需的命令行选项。然后,您发送的任何 Dataproc Hive 作业现在将以 "tsv2" 和 "silent" 模式输出,这意味着没有无关的日志语句,输出将是原始 tsv。
迟到的报告,对于发现这个问题的人:
我最近经历了这个过程,发现使这个工作可靠的唯一方法是将查询的输出写入存储文件夹,然后从文件夹中的文件中读回。示例:
INSERT OVERWITE DIRECTORY "gs://bucket/path/"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
SELECT ...
从作业日志中读取和解析并不是一个可靠的设计,并且格式在我部署第一个版本后几个月发生了变化。作业日志是给人看的,不是给电脑看的