使用 AWS Glue 创建分区数据并保存到 s3
Create paritioned data using AWS Glue and save into s3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "table_name", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("dateregistered", "timestamp", "dateregistered", "timestamp"), ("id", "int", "id", "int")], transformation_ctx = "applymapping1")
df = applymapping1.toDF()
repartitioned_with_new_columns_df = applymapping1.select("*")
.withColumn("date_col", to_date(from_unixtime(col("dateRegistered"))))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop(col("date_col"))
#.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "bucket-path",
"partitionKeys": ["year", "month", "day"]
},
format = "json",
transformation_ctx = "datasink")
job.commit()
我有上面的脚本,但我不知道为什么不起作用,或者它是否是正确的方法。
有人可以检查并让我知道我做错了什么吗?
这里的目标是每天 运行 这个工作,然后写这个 table 分区,然后将它保存在 s3 json 或 parquet。
您在操作列时引用了错误的数据框。
applymapping1.select("*")
实际上应该是 df.select("*")
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "table_name", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("dateregistered", "timestamp", "dateregistered", "timestamp"), ("id", "int", "id", "int")], transformation_ctx = "applymapping1")
df = applymapping1.toDF()
repartitioned_with_new_columns_df = applymapping1.select("*")
.withColumn("date_col", to_date(from_unixtime(col("dateRegistered"))))
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.drop(col("date_col"))
#.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "bucket-path",
"partitionKeys": ["year", "month", "day"]
},
format = "json",
transformation_ctx = "datasink")
job.commit()
我有上面的脚本,但我不知道为什么不起作用,或者它是否是正确的方法。
有人可以检查并让我知道我做错了什么吗?
这里的目标是每天 运行 这个工作,然后写这个 table 分区,然后将它保存在 s3 json 或 parquet。
您在操作列时引用了错误的数据框。
applymapping1.select("*")
实际上应该是 df.select("*")