PySpark - Spark 集群 EC2 - 无法保存到 S3

PySpark - Spark clusters EC2 - unable to save to S3

我已经设置了一个 spark 集群,其中有一个主节点和 2 个从节点(我使用的是 Spark Standalone)。该集群在某些示例中运行良好,但不适用于我的应用程序。我的应用程序工作流程是,它将读取 csv -> 提取 csv 中的每一行以及 header -> 转换为 JSON -> 保存到 S3。这是我的代码:

def upload_func(row):
    f = row.toJSON()
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
    print(f)
    print(row.name)

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL data source example") \
        .getOrCreate()
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
    df.rdd.map(upload_func)

我还将 AWS_Key_IDAWS_Secret_Key 导出到 ec2 环境中。但是使用上面的代码,我的应用程序不起作用。以下是问题:

  1. JSON 文件未保存在 S3 中,我已尝试 运行 几次应用程序并重新加载 S3 页面但没有数据。应用程序完成,日志中没有任何错误。此外,print(f)print(row.name) 不会在日志中打印出来。我需要修复什么才能在 S3 上保存 JSON,我是否可以在日志上打印以进行调试?

  2. 目前我需要将 csv 文件放在工作节点中,以便应用程序可以读取 csv 文件。我如何将文件放在另一个地方,比如说主节点,当应用程序 运行s 时,它会将 csv 文件拆分到所有工作节点,以便它们可以作为分布式系统并行上传?

非常感谢您的帮助。提前感谢您的帮助。

已更新

调试 Logger 后,我确定了地图函数 upload_func() 未被调用或应用程序无法进入此函数的问题(Logger 在函数调用前后打印消息)。知道原因的请帮忙看看?

您需要强制对地图进行评估; spark 只会按需执行工作。

df.rdd.map(upload_func).count()应该做