如何在 AWS Glue 作业脚本中从 AWS RDS 获取数据并相应地转换数据并将其插入回 aws rds?

How to fetch data from AWS RDS in AWS Glue job script and transform the data accordingly and insert it back in aws rds?

我想通过 AWS Glue 从 s3 bucket csv 中插入数据,但数据的插入取决于存储在 AWS RDS 中的数据(例如,如果该数据已经存在则不插入,或者如果已经存在并且只有一些属性该数据被更新然后更新值)。有什么方法可以在 AWS Glue 作业脚本中从 RDS 中获取数据,然后相应地转换数据。

我还想过在 AWS Glue 作业完成后调用 AWS lambda。 aws glue 作业脚本是否有任何代码可以在 aws glue 作业完成后调用 lambda,或者有任何方法可以获取 aws glue 作业的响应,例如 SNS 或我们可以调用 lambda 的东西?

我希望在 aws rds 中插入数据后 table,将调用一个存储过程来处理数据

我们已经设法从 Glue 作业直接将数据插入 MySQL Aurora 集群,并通过简单地发送 INSERT ON DUPLICATE KEY UPDATE 语句来避免重复。它看起来像这样:

import mysql

# Other logic here...

# Convert to DataFrame (from a DynamicFrame) 
df = mapping.toDF()

# Connect to MySQL
db = mysql.connect(glueContext, 'name-of-mysql-connection', 'db')
cursor = db.cursor()

insert_statement = """
  INSERT INTO my_table
  (column1, column2, updated_at)
  VALUES(%s, %s, NOW())
  ON DUPLICATE KEY UPDATE
  column1 = %s,
  column2 = %s,
  updated_at = NOW()
  """

# Execute each statement
for row in df.rdd.collect():
  cursor.execute(insert_statement, (
    row['column1'],
    row['column2'],
    row['column1'],
    row['column2']
  ))

# Commit to the database
db.commit()
db.close()