PySpark - Spark 集群 EC2 - 无法保存到 S3
PySpark - Spark clusters EC2 - unable to save to S3
我已经设置了一个 spark 集群,其中有一个主节点和 2 个从节点(我使用的是 Spark Standalone)。该集群在某些示例中运行良好,但不适用于我的应用程序。我的应用程序工作流程是,它将读取 csv -> 提取 csv 中的每一行以及 header -> 转换为 JSON -> 保存到 S3。这是我的代码:
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
我还将 AWS_Key_ID
和 AWS_Secret_Key
导出到 ec2 环境中。但是使用上面的代码,我的应用程序不起作用。以下是问题:
JSON 文件未保存在 S3 中,我已尝试 运行 几次应用程序并重新加载 S3 页面但没有数据。应用程序完成,日志中没有任何错误。此外,print(f)
和 print(row.name)
不会在日志中打印出来。我需要修复什么才能在 S3 上保存 JSON,我是否可以在日志上打印以进行调试?
目前我需要将 csv 文件放在工作节点中,以便应用程序可以读取 csv 文件。我如何将文件放在另一个地方,比如说主节点,当应用程序 运行s 时,它会将 csv 文件拆分到所有工作节点,以便它们可以作为分布式系统并行上传?
非常感谢您的帮助。提前感谢您的帮助。
已更新
调试 Logger 后,我确定了地图函数 upload_func()
未被调用或应用程序无法进入此函数的问题(Logger 在函数调用前后打印消息)。知道原因的请帮忙看看?
您需要强制对地图进行评估; spark 只会按需执行工作。
df.rdd.map(upload_func).count()
应该做
我已经设置了一个 spark 集群,其中有一个主节点和 2 个从节点(我使用的是 Spark Standalone)。该集群在某些示例中运行良好,但不适用于我的应用程序。我的应用程序工作流程是,它将读取 csv -> 提取 csv 中的每一行以及 header -> 转换为 JSON -> 保存到 S3。这是我的代码:
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
我还将 AWS_Key_ID
和 AWS_Secret_Key
导出到 ec2 环境中。但是使用上面的代码,我的应用程序不起作用。以下是问题:
JSON 文件未保存在 S3 中,我已尝试 运行 几次应用程序并重新加载 S3 页面但没有数据。应用程序完成,日志中没有任何错误。此外,
print(f)
和print(row.name)
不会在日志中打印出来。我需要修复什么才能在 S3 上保存 JSON,我是否可以在日志上打印以进行调试?目前我需要将 csv 文件放在工作节点中,以便应用程序可以读取 csv 文件。我如何将文件放在另一个地方,比如说主节点,当应用程序 运行s 时,它会将 csv 文件拆分到所有工作节点,以便它们可以作为分布式系统并行上传?
非常感谢您的帮助。提前感谢您的帮助。
已更新
调试 Logger 后,我确定了地图函数 upload_func()
未被调用或应用程序无法进入此函数的问题(Logger 在函数调用前后打印消息)。知道原因的请帮忙看看?
您需要强制对地图进行评估; spark 只会按需执行工作。
df.rdd.map(upload_func).count()
应该做