写入 AWS Redshift 的 Spark 2.4.0 依赖项

Spark 2.4.0 dependencies to write to AWS Redshift

我正在努力寻找正确的包依赖项及其相关版本以使用 Pyspark 微批处理方法写入 Redshfit 数据库。

实现此目标的正确依赖项是什么?

根据 AWS tutorial 的建议,必须提供 JDBC 驱动程序

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar

下载此 jar 并使其可用于 spark-submit 命令后,这就是我为其提供依赖项的方式:

spark-submit --master yarn --deploy-mode cluster \
  --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
  --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
  my_script.py

最后这是我提供给 spark-submit

my_script.py
from pyspark.sql import SparkSession

def foreach_batch_function(df, epoch_id, table_name):
    df.write\
        .format("com.databricks.spark.redshift") \
        .option("aws_iam_role", my_role) \
        .option("url", my_redshift_url) \
        .option("user", my_redshift_user) \
        .option("password", my_redshift_password) \
        .option("dbtable", my_redshift_schema + "." + table_name)\
        .option("tempdir", "s3://my/temp/dir") \
        .mode("append")\
        .save()

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)

my_schema = spark.read.parquet(my_schema_file_path).schema

df = spark \
    .readStream \
    .schema(my_schema) \
    .option("maxFilesPerTrigger", 100) \
    .parquet(my_source_path)

df.writeStream \
    .trigger(processingTime='30 seconds') \
    .foreachBatch(lambda df, epochId: foreach_batch_function(df, epochId, my_redshift_table))\
    .option("checkpointLocation", my_checkpoint_location) \
    .start(outputMode="update").awaitTermination()