使用 pyspark 将作业粘合到联合数据帧
Glue Job to union dataframes using pyspark
我基本上是在尝试 update/add 从一个 DF 到另一个 DF 的行。这是我的代码:
# S3
import boto3
# SOURCE
source_table = "someDynamoDbtable"
source_s3 = "s://mybucket/folder/"
# DESTINATION
destination_bucket = "s3://destination-bucket"
#Select which attributes to update/add
params = ['attributeD', 'attributeF', 'AttributeG']
#spark wrapper
glueContext = GlueContext(SparkContext.getOrCreate())
newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()
oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()
#makes a union of the dataframes
rebuildData = oldDF.union(newDF)
#error happens here
readyData = DynamicFrame.fromDF(rebuildData, glueContext, "readyData")
#writes new data to s3 destination, into orc files, while partitioning
glueContext.write_dynamic_frame.from_options(frame = readyData, connection_type = "s3", connection_options = {"path": destination_bucket}, format = "orc", partitionBy=['partition_year', 'partition_month', 'partition_day'])
我得到的错误是:
SyntaxError: invalid syntax on line readyData = ...
到目前为止我还不知道出了什么问题。
您正在执行数据帧和动态帧之间的联合操作。
这将创建一个名为 newData 的动态帧和一个名为 newDF:
的数据帧
newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()
这将创建一个名为 oldData 的动态帧和一个名为 oldDF 的数据帧:
oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()
并且您正在对上述两个实体执行联合操作,如下所示:
rebuildData = oldDF.union(newData)
应该是:
rebuildData = oldDF.union(newDF)
是的,所以我认为对于我需要做的事情,使用 OUTER JOIN 会更好。
让我解释一下:
- 我加载了两个数据帧,其中一个删除了我们要更新的字段。
- 第二个只选择那些字段,因此两者不会重复 rows/columns。
- 我们使用外部(或完整)连接,而不是只会添加行的联合。这添加了我的数据框中的所有数据,没有重复。
现在我的逻辑可能有缺陷,但到目前为止它对我来说还不错。如果有人正在寻找类似的解决方案,欢迎您使用。
我更改的代码:
rebuildData = oldDF.join(newData, 'id', 'outer')
我基本上是在尝试 update/add 从一个 DF 到另一个 DF 的行。这是我的代码:
# S3
import boto3
# SOURCE
source_table = "someDynamoDbtable"
source_s3 = "s://mybucket/folder/"
# DESTINATION
destination_bucket = "s3://destination-bucket"
#Select which attributes to update/add
params = ['attributeD', 'attributeF', 'AttributeG']
#spark wrapper
glueContext = GlueContext(SparkContext.getOrCreate())
newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()
oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()
#makes a union of the dataframes
rebuildData = oldDF.union(newDF)
#error happens here
readyData = DynamicFrame.fromDF(rebuildData, glueContext, "readyData")
#writes new data to s3 destination, into orc files, while partitioning
glueContext.write_dynamic_frame.from_options(frame = readyData, connection_type = "s3", connection_options = {"path": destination_bucket}, format = "orc", partitionBy=['partition_year', 'partition_month', 'partition_day'])
我得到的错误是:
SyntaxError: invalid syntax on line readyData = ...
到目前为止我还不知道出了什么问题。
您正在执行数据帧和动态帧之间的联合操作。
这将创建一个名为 newData 的动态帧和一个名为 newDF:
的数据帧newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()
这将创建一个名为 oldData 的动态帧和一个名为 oldDF 的数据帧:
oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()
并且您正在对上述两个实体执行联合操作,如下所示:
rebuildData = oldDF.union(newData)
应该是:
rebuildData = oldDF.union(newDF)
是的,所以我认为对于我需要做的事情,使用 OUTER JOIN 会更好。 让我解释一下:
- 我加载了两个数据帧,其中一个删除了我们要更新的字段。
- 第二个只选择那些字段,因此两者不会重复 rows/columns。
- 我们使用外部(或完整)连接,而不是只会添加行的联合。这添加了我的数据框中的所有数据,没有重复。
现在我的逻辑可能有缺陷,但到目前为止它对我来说还不错。如果有人正在寻找类似的解决方案,欢迎您使用。 我更改的代码:
rebuildData = oldDF.join(newData, 'id', 'outer')