如何使用 Apache-Spark 在 AWS 集群上 运行 编码?
How to run code on the AWS cluster using Apache-Spark?
我写了一个 python 代码来总结每个 csv 文件第一列中的所有数字,如下所示:
import os, sys, inspect, csv
### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]
### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir
### Setup pyspark directory path
pyspark_dir = python_dir
sys.path.append(pyspark_dir)
### Import the pyspark
from pyspark import SparkConf, SparkContext
### Specify the data file directory, and load the data files
data_path = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "./test_dir")))
### myfunc is to add all numbers in the first column.
def myfunc(s):
total = 0
if s.endswith(".csv"):
cr = csv.reader(open(s,"rb"))
for row in cr:
total += int(row[0])
return total
def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("spark://ec2-52-26-177-197.us-west-2.compute.amazonaws.com:7077")
sc = SparkContext(conf = conf)
datafile = sc.wholeTextFiles(data_path)
### Sent the application in each of the slave node
temp = datafile.map(lambda (path, content): myfunc(str(path).strip('file:')))
### Collect the result and print it out.
for x in temp.collect():
print x
if __name__ == "__main__":
main()
我想使用 Apache-Spark 使用相同的 python 代码并行处理多个 csv 文件的求和过程。我已经完成了以下步骤:
- 我已经在 AWS 上创建了一个主节点和两个从节点。
- 我使用 bash 命令
$ scp -r -i my-key-pair.pem my_dir root@ec2-52-27-82-124.us-west-2.compute.amazonaws.com
将目录 my_dir
包括我的 python 代码和 csv 文件上传到集群主节点上。
- 我已经登录我的主节点,然后使用 bash 命令
$ ./spark/copy-dir my_dir
将我的 python 代码和 csv 文件发送到所有从节点。
我已经在主节点上设置了环境变量:
$ export SPARK_HOME=~/spark
$ export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
然而,当我运行主节点上的python代码时:$ python sum.py
,它显示以下错误:
Traceback (most recent call last):
File "sum.py", line 18, in <module>
from pyspark import SparkConf, SparkContext
File "/root/spark/python/pyspark/__init__.py", line 41, in <module>
from pyspark.context import SparkContext
File "/root/spark/python/pyspark/context.py", line 31, in <module>
from pyspark.java_gateway import launch_gateway
File "/root/spark/python/pyspark/java_gateway.py", line 31, in <module>
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
ImportError: No module named py4j.java_gateway
我不知道这个错误。另外,我想知道主节点是否会自动并行调用所有从节点到运行。如果有人能帮助我,我真的很感激。
我认为您在问两个不同的问题。您似乎遇到了导入错误。是否有可能您在本地计算机上安装了不同版本的软件包 py4j 而您尚未在主节点上安装?
我无法并行处理 运行 这个问题。
下面是我将如何调试这个特定的导入错误。
- ssh 到你的主节点
- 运行 python REPL 与
$ python
- 尝试失败的导入行
>> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
- 如果失败,请尝试 运行
>> import py4j
- 如果失败,则表示您的系统没有安装 py4j 或找不到它。
- 退出 REPL
>> exit()
- 尝试安装 py4j
$ pip install py4j
(您需要安装 pip)
- 打开 REPL
$ python
- 再次尝试导入
>> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
- 如果可行,那么
>> exit()
并再次尝试 运行 您的 $ python sum.py
我写了一个 python 代码来总结每个 csv 文件第一列中的所有数字,如下所示:
import os, sys, inspect, csv
### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]
### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir
### Setup pyspark directory path
pyspark_dir = python_dir
sys.path.append(pyspark_dir)
### Import the pyspark
from pyspark import SparkConf, SparkContext
### Specify the data file directory, and load the data files
data_path = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "./test_dir")))
### myfunc is to add all numbers in the first column.
def myfunc(s):
total = 0
if s.endswith(".csv"):
cr = csv.reader(open(s,"rb"))
for row in cr:
total += int(row[0])
return total
def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("spark://ec2-52-26-177-197.us-west-2.compute.amazonaws.com:7077")
sc = SparkContext(conf = conf)
datafile = sc.wholeTextFiles(data_path)
### Sent the application in each of the slave node
temp = datafile.map(lambda (path, content): myfunc(str(path).strip('file:')))
### Collect the result and print it out.
for x in temp.collect():
print x
if __name__ == "__main__":
main()
我想使用 Apache-Spark 使用相同的 python 代码并行处理多个 csv 文件的求和过程。我已经完成了以下步骤:
- 我已经在 AWS 上创建了一个主节点和两个从节点。
- 我使用 bash 命令
$ scp -r -i my-key-pair.pem my_dir root@ec2-52-27-82-124.us-west-2.compute.amazonaws.com
将目录my_dir
包括我的 python 代码和 csv 文件上传到集群主节点上。 - 我已经登录我的主节点,然后使用 bash 命令
$ ./spark/copy-dir my_dir
将我的 python 代码和 csv 文件发送到所有从节点。 我已经在主节点上设置了环境变量:
$ export SPARK_HOME=~/spark
$ export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
然而,当我运行主节点上的python代码时:$ python sum.py
,它显示以下错误:
Traceback (most recent call last):
File "sum.py", line 18, in <module>
from pyspark import SparkConf, SparkContext
File "/root/spark/python/pyspark/__init__.py", line 41, in <module>
from pyspark.context import SparkContext
File "/root/spark/python/pyspark/context.py", line 31, in <module>
from pyspark.java_gateway import launch_gateway
File "/root/spark/python/pyspark/java_gateway.py", line 31, in <module>
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
ImportError: No module named py4j.java_gateway
我不知道这个错误。另外,我想知道主节点是否会自动并行调用所有从节点到运行。如果有人能帮助我,我真的很感激。
我认为您在问两个不同的问题。您似乎遇到了导入错误。是否有可能您在本地计算机上安装了不同版本的软件包 py4j 而您尚未在主节点上安装?
我无法并行处理 运行 这个问题。
下面是我将如何调试这个特定的导入错误。
- ssh 到你的主节点
- 运行 python REPL 与
$ python
- 尝试失败的导入行
>> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
- 如果失败,请尝试 运行
>> import py4j
- 如果失败,则表示您的系统没有安装 py4j 或找不到它。
- 退出 REPL
>> exit()
- 尝试安装 py4j
$ pip install py4j
(您需要安装 pip) - 打开 REPL
$ python
- 再次尝试导入
>> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
- 如果可行,那么
>> exit()
并再次尝试 运行 您的$ python sum.py