如何正确地将普通 Python 应用程序转换为 PySpark 版本
How do I properly transform a normal Python application into PySpark version
我是 PySpark 的初学者,最近我尝试向我的 spark 集群提交一个简单的 python 应用程序(批量调整图片大小)。我可以通过 pycharm 成功 运行 应用程序,当我将我的应用程序提交给 spark 时,图像也会调整大小。
这是我原来的Python代码:
import os
from PIL import Image
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
然后我将其转换为我认为可以正确提交我的 python 申请的方式:
import os
from PIL import Image
from pyspark import SparkContext, SparkConf
APP_NAME = "ImageResizer"
def main(sc):
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
print 'done'
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("spark://10.233.70.48:7077")
sc = SparkContext(conf=conf)
main(sc)
然而,我被告知我实际上根本没有使用 spark(我也这么认为,但我只是不知道如何)。我想知道如何正确地将我的原始代码转换为 Pyspark 方式。
熟悉 pyspark 的人可以帮助我吗?以及关于我在哪里可以正确和系统地学习如何编写 PySpark 应用程序的任何建议?
谢谢
但图像未调整大小 - 这与应用程序失败不同。提交应用程序时,它使用特定于应用程序的工作目录。那里不会有任何文件要处理,它不做任何工作就存在。
现在您根本没有使用 spark。您只是将 SparkContext 用作传递给主函数的变量(然后什么也不做)。为了使用 PySpark,您需要重新考虑您的应用程序。 os.listdir('.')
之类的命令在单台机器上运行良好,但是如果您 运行 它在计算机集群上运行,那么 .
指的是哪个目录?提交作业的机器?每台机器上的本地目录?共享网络驱动器?如果您只是 运行ning 在一台机器上(对于足够的测试)。您可以通过简单地并行化列表(将其转换为 RDD)来开始使用 Spark。然后,您可以在 RDD 上应用操作,例如 map
、filter
和 reduce
s_list = sc.parallelize(os.listdir('.'))
s_jpg_list = s_list.filter(lambda f: f.endswith('.jpg'))
def resize_image(f):
i = Image.open(f)
size_64 = (64,64)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
out_path = 'resize/{}_64'.format(fn, fext)
i.save(out_path)
return out_path
s_jpg_files = s_jpg_list.map(resize_image)
print('Converted Images:', s_jpg_files.collect())
我是 PySpark 的初学者,最近我尝试向我的 spark 集群提交一个简单的 python 应用程序(批量调整图片大小)。我可以通过 pycharm 成功 运行 应用程序,当我将我的应用程序提交给 spark 时,图像也会调整大小。
这是我原来的Python代码:
import os
from PIL import Image
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
然后我将其转换为我认为可以正确提交我的 python 申请的方式:
import os
from PIL import Image
from pyspark import SparkContext, SparkConf
APP_NAME = "ImageResizer"
def main(sc):
size_64 = (64,64)
for f in os.listdir('.')
if f.endswith('.jpg'):
i = Image.open(f)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
i.save('resize/{}_64'.format(fn, fext))
print 'done'
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("spark://10.233.70.48:7077")
sc = SparkContext(conf=conf)
main(sc)
然而,我被告知我实际上根本没有使用 spark(我也这么认为,但我只是不知道如何)。我想知道如何正确地将我的原始代码转换为 Pyspark 方式。
熟悉 pyspark 的人可以帮助我吗?以及关于我在哪里可以正确和系统地学习如何编写 PySpark 应用程序的任何建议? 谢谢
但图像未调整大小 - 这与应用程序失败不同。提交应用程序时,它使用特定于应用程序的工作目录。那里不会有任何文件要处理,它不做任何工作就存在。
现在您根本没有使用 spark。您只是将 SparkContext 用作传递给主函数的变量(然后什么也不做)。为了使用 PySpark,您需要重新考虑您的应用程序。 os.listdir('.')
之类的命令在单台机器上运行良好,但是如果您 运行 它在计算机集群上运行,那么 .
指的是哪个目录?提交作业的机器?每台机器上的本地目录?共享网络驱动器?如果您只是 运行ning 在一台机器上(对于足够的测试)。您可以通过简单地并行化列表(将其转换为 RDD)来开始使用 Spark。然后,您可以在 RDD 上应用操作,例如 map
、filter
和 reduce
s_list = sc.parallelize(os.listdir('.'))
s_jpg_list = s_list.filter(lambda f: f.endswith('.jpg'))
def resize_image(f):
i = Image.open(f)
size_64 = (64,64)
fn, fext = os.path.splitext(f)
i.thumbnail(size_64)
out_path = 'resize/{}_64'.format(fn, fext)
i.save(out_path)
return out_path
s_jpg_files = s_jpg_list.map(resize_image)
print('Converted Images:', s_jpg_files.collect())