Parametrized/reusable AWS Glue 作业
Parametrized/reusable AWS Glue job
我是 AWS 的新手,我正在尝试创建一个应该具有输入参数的参数化 AWS Glue 作业:
- 数据源
- 数据大小
- 计数
- 变量列表
有没有人做过类似的事情?
首先,我不确定您是否能够按大小限制数据。相反,我建议按行数限制数据。正如我在 AWS Glue Job Input Parameters 中所述,您可以将其中两个变量放入您的工作中。当涉及到变量列表时,如果是大量的变量,我担心您将无法使用标准方式提供这些输入。在这种情况下,我建议以与数据相同的方式提供这些变量,我的意思是使用平面文件。例如:
var1;var2;var3
1;2;3
综上所述,我建议定义以下输入变量:
- Datasource(S3中存放数据的地方的路径,也可以把这个变量拆分成两个变量-database和table(在Glue数据目录中))
- 行数(您想要select的行数)
- 变量源(S3 中存储变量文件的位置的路径)
这是代码示例:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','SOURCE_DB','SOURCE_TAB','NUM_ROWS','DEST_FOLDER'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df_new = glueContext.create_dynamic_frame.from_catalog(database = args['SOURCE_DB'], table_name = args['SOURCE_TAB'], transformation_ctx = "full_data")
df_0 = df_new.toDF()
df_0.createOrReplaceTempView("spark_dataframe")
choice_data = spark.sql("Select x,y,z from spark_dataframe")
choice_data = choice_data.limit(int(args['NUM_ROWS']))
choice_data.repartition(1).write.format('csv').mode('overwrite').options(delimiter=',',header=True).save("s3://"+ args['DEST_FOLDER'] +"/")
job.commit()
当然,您还必须在 Glue 作业配置中提供适当的输入变量。
args = getResolvedOptions(sys.argv, ['JOB_NAME','source_db','source_table','count','dest_folder'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df_new = glueContext.create_dynamic_frame.from_catalog(database = args['source_db'], table_name = args['source_table'], transformation_ctx = "sample_data")
df_0 = df_new.toDF()
df_0.registerTempTable("spark_dataframe")
new_data = spark.sql("Select * from spark_dataframe")
sample = new_data.limit(args['count'])
sample.repartition(1).write.format('csv').options(delimiter=',',header=True).save("s3://"+ args['dest_folder'] +"/")
job.commit()
I am getting error for line
sample = new_data.limit(args['count'])
error:
py4j.Py4JException: Method limit([class java.lang.String]) does not exist
but the argument passed is not a string.
我是 AWS 的新手,我正在尝试创建一个应该具有输入参数的参数化 AWS Glue 作业:
- 数据源
- 数据大小
- 计数
- 变量列表
有没有人做过类似的事情?
首先,我不确定您是否能够按大小限制数据。相反,我建议按行数限制数据。正如我在 AWS Glue Job Input Parameters 中所述,您可以将其中两个变量放入您的工作中。当涉及到变量列表时,如果是大量的变量,我担心您将无法使用标准方式提供这些输入。在这种情况下,我建议以与数据相同的方式提供这些变量,我的意思是使用平面文件。例如:
var1;var2;var3
1;2;3
综上所述,我建议定义以下输入变量:
- Datasource(S3中存放数据的地方的路径,也可以把这个变量拆分成两个变量-database和table(在Glue数据目录中))
- 行数(您想要select的行数)
- 变量源(S3 中存储变量文件的位置的路径)
这是代码示例:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','SOURCE_DB','SOURCE_TAB','NUM_ROWS','DEST_FOLDER'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df_new = glueContext.create_dynamic_frame.from_catalog(database = args['SOURCE_DB'], table_name = args['SOURCE_TAB'], transformation_ctx = "full_data")
df_0 = df_new.toDF()
df_0.createOrReplaceTempView("spark_dataframe")
choice_data = spark.sql("Select x,y,z from spark_dataframe")
choice_data = choice_data.limit(int(args['NUM_ROWS']))
choice_data.repartition(1).write.format('csv').mode('overwrite').options(delimiter=',',header=True).save("s3://"+ args['DEST_FOLDER'] +"/")
job.commit()
当然,您还必须在 Glue 作业配置中提供适当的输入变量。
args = getResolvedOptions(sys.argv, ['JOB_NAME','source_db','source_table','count','dest_folder'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df_new = glueContext.create_dynamic_frame.from_catalog(database = args['source_db'], table_name = args['source_table'], transformation_ctx = "sample_data")
df_0 = df_new.toDF()
df_0.registerTempTable("spark_dataframe")
new_data = spark.sql("Select * from spark_dataframe")
sample = new_data.limit(args['count'])
sample.repartition(1).write.format('csv').options(delimiter=',',header=True).save("s3://"+ args['dest_folder'] +"/")
job.commit()
I am getting error for line
sample = new_data.limit(args['count'])
error:
py4j.Py4JException: Method limit([class java.lang.String]) does not exist
but the argument passed is not a string.