AWS 下推谓词在读取 HIVE 分区时不起作用
AWS push down predicate not working when reading HIVE partitions
尝试测试一些粘合功能,下推谓词不适用于 S3 中分区用于 HIVE 的 avro 文件。我们的分区如下:YYYY-MM-DD。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
filterpred = "loaddate == '2019-08-08'"
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hive",
table_name = "stuff",
pushDownPredicate = filterpred)
print ('############################################')
print "COUNT: ", datasource0.count()
print ('##############################################')
df = datasource0.toDF()
df.show(5)
job.commit()
但是我仍然看到日期超出范围的胶水。:
Opening 's3://data/2018-11-29/part-00000-a58ee9cb-c82c-46e6-9657-85b4ead2927d-c000.avro' for reading
2019-09-13 13:47:47,071 INFO [Executor task launch worker for task 258] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1208)) -
Opening 's3://data/2017-09-28/part-00000-53c07db9-05d7-4032-aa73-01e239f509cf.avro' for reading
我尝试使用以下示例:
AWS Glue pushdown predicate not working properly
目前 none 提出的解决方案对我有用。我尝试添加分区列(loaddate),将其取出,引用,取消引用等。仍然在日期范围之外抓取。
您的代码中存在语法错误。传递给 from_catalog 函数的正确参数是“push_down_predicate”而不是“pushDownPredicate".
示例片段:
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "hive",
table_name = "stuff",
push_down_predicate = filterpred)
参考 AWS 文档:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
您的分区似乎不是 Hive 命名风格,因此您必须在查询中 use a default one partition_0
。另外,正如另一个答案中所建议的,该参数称为 push_down_predicate
:
filterpred = "partition_0 == '2019-08-08'"
datasource0 = glue_context.create_dynamic_frame.from_catalog(
database = "hive",
table_name = "stuff",
push_down_predicate = filterpred)
确保您的代码正确分区并 运行 在 Glue 爬虫中创建分区 table。
运行 在 Athena 中查询以修复您的 table .
MSCK REPAIR TABLE tbl;
运行 在 Athena 中查询以检查分区。
SHOW PARTITIONS tbl;
Scala 你可以使用下面的代码
没有谓词
val datasource0 = glueContext.getCatalogSource(database = "ny_taxi_db", tableName = "taxi_tbl", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
datasource0.toDF().count()
带谓词:
val predicate = "(year == '2016' and year_month == '201601' and year_month_day == '20160114')"
val datasource1 = glueContext.getCatalogSource(database = "ny_taxi_db",tableName = "taxi_tbl" , transformationContext = "datasource1",pushDownPredicate = predicate).getDynamicFrame() //
datasource1.toDF().count()
Python 你可以使用下面的代码:
没有谓词
ds = glueContext.create_dynamic_frame.from_catalog(database =
"ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx =
"datasource0" )
ds.toDF().count()
带谓词:
ds1 = glueContext.create_dynamic_frame.from_catalog(database = "ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx = "datasource1" , push_down_predicate = "(vendorid == 1)")
ds1.toDF().count()
尝试测试一些粘合功能,下推谓词不适用于 S3 中分区用于 HIVE 的 avro 文件。我们的分区如下:YYYY-MM-DD。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
filterpred = "loaddate == '2019-08-08'"
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hive",
table_name = "stuff",
pushDownPredicate = filterpred)
print ('############################################')
print "COUNT: ", datasource0.count()
print ('##############################################')
df = datasource0.toDF()
df.show(5)
job.commit()
但是我仍然看到日期超出范围的胶水。:
Opening 's3://data/2018-11-29/part-00000-a58ee9cb-c82c-46e6-9657-85b4ead2927d-c000.avro' for reading
2019-09-13 13:47:47,071 INFO [Executor task launch worker for task 258] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1208)) -
Opening 's3://data/2017-09-28/part-00000-53c07db9-05d7-4032-aa73-01e239f509cf.avro' for reading
我尝试使用以下示例:
AWS Glue pushdown predicate not working properly
目前 none 提出的解决方案对我有用。我尝试添加分区列(loaddate),将其取出,引用,取消引用等。仍然在日期范围之外抓取。
您的代码中存在语法错误。传递给 from_catalog 函数的正确参数是“push_down_predicate”而不是“pushDownPredicate".
示例片段:
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "hive",
table_name = "stuff",
push_down_predicate = filterpred)
参考 AWS 文档:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
您的分区似乎不是 Hive 命名风格,因此您必须在查询中 use a default one partition_0
。另外,正如另一个答案中所建议的,该参数称为 push_down_predicate
:
filterpred = "partition_0 == '2019-08-08'"
datasource0 = glue_context.create_dynamic_frame.from_catalog(
database = "hive",
table_name = "stuff",
push_down_predicate = filterpred)
确保您的代码正确分区并 运行 在 Glue 爬虫中创建分区 table。
运行 在 Athena 中查询以修复您的 table .
MSCK REPAIR TABLE tbl;
运行 在 Athena 中查询以检查分区。
SHOW PARTITIONS tbl;
Scala 你可以使用下面的代码
没有谓词
val datasource0 = glueContext.getCatalogSource(database = "ny_taxi_db", tableName = "taxi_tbl", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
datasource0.toDF().count()
带谓词:
val predicate = "(year == '2016' and year_month == '201601' and year_month_day == '20160114')"
val datasource1 = glueContext.getCatalogSource(database = "ny_taxi_db",tableName = "taxi_tbl" , transformationContext = "datasource1",pushDownPredicate = predicate).getDynamicFrame() //
datasource1.toDF().count()
Python 你可以使用下面的代码:
没有谓词
ds = glueContext.create_dynamic_frame.from_catalog(database =
"ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx =
"datasource0" )
ds.toDF().count()
带谓词:
ds1 = glueContext.create_dynamic_frame.from_catalog(database = "ny_taxi_db" , table_name = "taxi_data_by_vender", transformation_ctx = "datasource1" , push_down_predicate = "(vendorid == 1)")
ds1.toDF().count()