AWS Athena select 查询结果不时包含数据
AWS Athena select query result include data inconstantly
环境
从 python3.7 -> boto3 -> S3.
收集原始数据
使用 parquet(使用 fastparquet 序列化)
将 Glue Crawler 与非自定义一起使用(只需创建、分配 IAM 角色、s3 目标、制作空数据目录)
问题
仅在 2020-04-01 之前关注雅典娜查询 returns:
SELECT * FROM "rawdata" where "partition_0" >= '2020-03-29' and "partition_0" <= '2020-04-02';
SELECT * FROM "rawdata" where ("partition_0" = '2020-03-29' or "partition_0" = '2020-03-30' or "partition_0" = '2020-03-31' or "partition_0" = '2020-04-01' or "partition_0" = '2020-04-02')
#These two queries are same meaning, same result.
但是,如果我下一次查询 athena fowlling,它 returns 2020-04-02。
SELECT * FROM "rawdata" where "partition_0" >= '2020-04-02' and "partition_0" <= '2020-04-02';
SELECT * FROM "rawdata" where "partition_0" = '2020-04-02';
#Also these two queries are same meaning, same result.
结构
S3 分区遵循以下格式:
bucketname/collectorname/merged/rawdata/yyyy-mm-dd/data.parquet
Glue Crawler 有自己的 Data-Catalog,其名称与 Glue Crawler 相同。
Glue Crawler 的目标是下一个
bucketname/collectorname/merged/rawdata
每个 Glue Crawler 的 IAM 角色都是相同的,它们有两个 AWS 托管策略。
AWSGlueServiceRole
AmazonS3ReadOnlyAccess
parquet 文件使用 fastparquet pandas.to_parquet 保存,未压缩。
我没有在 Glue Crawler 中编辑任何脚本。
工作流程
收集器每 3 分钟收集 3000 行。 (周一至周五,上午 9 点至 15:30PM)
所以它合并到 3000 列镶木地板上以保存到下一个格式
#always next data (because 3 min term, but it is saperated with seconds)
bucketname/collectorname/notmerged/rawdata/yyyy-mm-dd/hh_mm_ss.parquet.bz2
#always overwrited, if someone request latest snapshot, system just use it (not athena)
bucketname/collectorname/cached/latest/data.parquet
下一次,解析器会工作。
Parser也是由python3.7制作的,接下来是它的伪代码。
import pandas as pd
import io
import boto3MyCustomWrapped
#download last collected dataframe
dfnew = pd.read_parquet(io.BytesIO(unzip(boto3MyCustomWrapped.s3.get_object(bucket="bucketname",key="collectorname/cached/latest/data.parquet.bz2")))
#today yyyy-mm-dd
strftime_of_today_datetime_yyyy_mm_dd = datetime.datetime.utcnow().strftime('%Y-%m-%d')
#merged data
dfold = pd.read_parquet(io.BytesIO(boto3MyCustomWrapped.s3.get_object(bucket="bucketname",key=f"collectorname/merged/{strftime_of_today_datetime_yyyy_mm_dd}/data.parquet"))
#some process is skipped (like processing if dfold not exist)
dfmerged = pd.concat([dfold, dfnew])
#validate for athena optimize (like column type clean)
dfmerged = validate_and_process_and_sort_and_clean_index(dfmerged)
#upload overwrite onto dfold's s3 path (!!data catalog is connected only this path!!)
boto3MyCustomWrapped.s3.put_object_from_dataframe(bucket="bucketname",
key=f"collectorname/merged/{strftime_of_today_datetime_yyyy_mm_dd}/data.parquet", dfmerged)
#today's glue crawling
if datetime.datetime.utcnow().hour == 9 and datetime.datetime.utcnow().minute < 10:
boto3MyCustomWrapped.glue.try_start_crawler('collectorname')
问题
我如何使用 Athena returns include today with 'including today' 查询? (不仅'exact today'查询)
问题是 Athena 结果包含或不包含 'today' 数据条件,即使我的每个查询都包含 'today'.
我不知道为什么,也不知道如何解决
更多信息
Athena 在所有情况下均未返回错误,所有 returns 列均正常。
为了查询 athena 中的新分区,您应该显式添加它们:
ALTER TABLE rawdata ADD
PARTITION (partition_0 = '2020-04-02');
或使用以下方法将所有分区加在一起:
msck repair table rawdata
我想在你的情况下,第一个查询 运行 在添加分区之前(通过胶水爬虫),这就是为什么 '2020-04-02' 的数据不可用。
看了好久,再也不会出现这个问题了。 (我什么都没做)
总之我终于搞不懂为什么会这样。
我对此感到有点不确定,但我觉得 Gabip 对我的问题的帮助负有责任。
我所做的一切都是
- 创建名称不带“.”的新数据目录(例如,目录名称是"crawling.siteurl.itemtype",table名称是"rawdata",我新建了一个table,名称是"crawlingsiteurlitemtype"。"rawdata")
- 从同一来源抓取相同的胶水。
- 向 "crawlingsiteurlitemtype" 显示分区。"rawdata" 运行良好(它包含 2020-04-02)
- 显示分区和 msck 修复不适用于我的源 table。 ("crawling.siteurl.itemtype"."rawdata")
- 但在 3 和 4 之后,athena 查询我的来源 table 的结果突然包含“2020-04-02”数据。
- 我今天挖了它,但我找不到它的工作原理,今天,查询原始数据目录仍然 returns '2020-04-06(=今天) 的良好响应'
- 所以我放弃了挖掘如何知道,但感觉有点不安全,因为总有一天,它可能会再次爆发。
无论如何,谢谢你的回答!
环境
从 python3.7 -> boto3 -> S3.
收集原始数据
使用 parquet(使用 fastparquet 序列化)
将 Glue Crawler 与非自定义一起使用(只需创建、分配 IAM 角色、s3 目标、制作空数据目录)
问题
仅在 2020-04-01 之前关注雅典娜查询 returns:
SELECT * FROM "rawdata" where "partition_0" >= '2020-03-29' and "partition_0" <= '2020-04-02';
SELECT * FROM "rawdata" where ("partition_0" = '2020-03-29' or "partition_0" = '2020-03-30' or "partition_0" = '2020-03-31' or "partition_0" = '2020-04-01' or "partition_0" = '2020-04-02')
#These two queries are same meaning, same result.
但是,如果我下一次查询 athena fowlling,它 returns 2020-04-02。
SELECT * FROM "rawdata" where "partition_0" >= '2020-04-02' and "partition_0" <= '2020-04-02';
SELECT * FROM "rawdata" where "partition_0" = '2020-04-02';
#Also these two queries are same meaning, same result.
结构
S3 分区遵循以下格式:
bucketname/collectorname/merged/rawdata/yyyy-mm-dd/data.parquet
Glue Crawler 有自己的 Data-Catalog,其名称与 Glue Crawler 相同。
Glue Crawler 的目标是下一个
bucketname/collectorname/merged/rawdata
每个 Glue Crawler 的 IAM 角色都是相同的,它们有两个 AWS 托管策略。
AWSGlueServiceRole
AmazonS3ReadOnlyAccess
parquet 文件使用 fastparquet pandas.to_parquet 保存,未压缩。
我没有在 Glue Crawler 中编辑任何脚本。
工作流程
收集器每 3 分钟收集 3000 行。 (周一至周五,上午 9 点至 15:30PM)
所以它合并到 3000 列镶木地板上以保存到下一个格式
#always next data (because 3 min term, but it is saperated with seconds)
bucketname/collectorname/notmerged/rawdata/yyyy-mm-dd/hh_mm_ss.parquet.bz2
#always overwrited, if someone request latest snapshot, system just use it (not athena)
bucketname/collectorname/cached/latest/data.parquet
下一次,解析器会工作。
Parser也是由python3.7制作的,接下来是它的伪代码。
import pandas as pd
import io
import boto3MyCustomWrapped
#download last collected dataframe
dfnew = pd.read_parquet(io.BytesIO(unzip(boto3MyCustomWrapped.s3.get_object(bucket="bucketname",key="collectorname/cached/latest/data.parquet.bz2")))
#today yyyy-mm-dd
strftime_of_today_datetime_yyyy_mm_dd = datetime.datetime.utcnow().strftime('%Y-%m-%d')
#merged data
dfold = pd.read_parquet(io.BytesIO(boto3MyCustomWrapped.s3.get_object(bucket="bucketname",key=f"collectorname/merged/{strftime_of_today_datetime_yyyy_mm_dd}/data.parquet"))
#some process is skipped (like processing if dfold not exist)
dfmerged = pd.concat([dfold, dfnew])
#validate for athena optimize (like column type clean)
dfmerged = validate_and_process_and_sort_and_clean_index(dfmerged)
#upload overwrite onto dfold's s3 path (!!data catalog is connected only this path!!)
boto3MyCustomWrapped.s3.put_object_from_dataframe(bucket="bucketname",
key=f"collectorname/merged/{strftime_of_today_datetime_yyyy_mm_dd}/data.parquet", dfmerged)
#today's glue crawling
if datetime.datetime.utcnow().hour == 9 and datetime.datetime.utcnow().minute < 10:
boto3MyCustomWrapped.glue.try_start_crawler('collectorname')
问题
我如何使用 Athena returns include today with 'including today' 查询? (不仅'exact today'查询)
问题是 Athena 结果包含或不包含 'today' 数据条件,即使我的每个查询都包含 'today'.
我不知道为什么,也不知道如何解决
更多信息
Athena 在所有情况下均未返回错误,所有 returns 列均正常。
为了查询 athena 中的新分区,您应该显式添加它们:
ALTER TABLE rawdata ADD
PARTITION (partition_0 = '2020-04-02');
或使用以下方法将所有分区加在一起:
msck repair table rawdata
我想在你的情况下,第一个查询 运行 在添加分区之前(通过胶水爬虫),这就是为什么 '2020-04-02' 的数据不可用。
看了好久,再也不会出现这个问题了。 (我什么都没做)
总之我终于搞不懂为什么会这样。
我对此感到有点不确定,但我觉得 Gabip 对我的问题的帮助负有责任。
我所做的一切都是
- 创建名称不带“.”的新数据目录(例如,目录名称是"crawling.siteurl.itemtype",table名称是"rawdata",我新建了一个table,名称是"crawlingsiteurlitemtype"。"rawdata")
- 从同一来源抓取相同的胶水。
- 向 "crawlingsiteurlitemtype" 显示分区。"rawdata" 运行良好(它包含 2020-04-02)
- 显示分区和 msck 修复不适用于我的源 table。 ("crawling.siteurl.itemtype"."rawdata")
- 但在 3 和 4 之后,athena 查询我的来源 table 的结果突然包含“2020-04-02”数据。
- 我今天挖了它,但我找不到它的工作原理,今天,查询原始数据目录仍然 returns '2020-04-06(=今天) 的良好响应'
- 所以我放弃了挖掘如何知道,但感觉有点不安全,因为总有一天,它可能会再次爆发。
无论如何,谢谢你的回答!