无法使用pyspark同时[并行]读取图像
Unable to read images simultaneously [in parallels] using pyspark
我在一个目录中有 10 张 jpeg 图片。
我想使用 pyspark 同时阅读所有这些内容。
我尝试如下。
from PIL import Image
from pyspark import SparkContext, SparkConf
conf = SparkConf()
spark = SparkContext(conf=conf)
files = glob.glob("E:\tests\*.jpg")
files_ = spark.parallelize(files)
arrs = []
for fi in files_.toLocalIterator():
im = Image.open(fi)
data = np.asarray(im)
arrs.append(data)
img = np.array(arrs)
print (img.shape)
代码无错结束,打印出来img.shape
;但是,它并没有 运行 并行。
你能帮帮我吗?
您可以使用 rdd.map to load and transform the pictures in parallel and then collect rdd 到 Python 列表中:
files = glob.glob("E:\tests\*.jpg")
file_rdd = spark.parallelize(files)
def image_to_array(path):
im = Image.open(path)
data = np.asarray(im)
return data
array_rdd = file_rdd.map(lambda f: image_to_array(f))
result_list = array_rdd.collect()
result_list
现在是一个包含 10 个元素的列表,每个元素是一个 numpy.ndarray
.
函数image_to_array
将在不同的Spark执行器上并行执行。如果你有一个多节点的 Spark 集群,你必须确保所有节点都可以访问 E:\tests\
.
收集数组后,可以继续处理
img = np.array(result_list, dtype=object)
我的解决方案遵循与 werner 相同的想法,但我只使用了 spark 库:
from pyspark.ml.image import ImageSchema
import numpy as np
df = (spark
.read
.format("image")
.option("pathGlobFilter", "*.jpg")
.load("your_data_path"))
df = df.select('image.*')
# Pre-caching the required schema. If you remove this line an error will be raised.
ImageSchema.imageFields
# Transforming images to np.array
arrays = df.rdd.map(ImageSchema.toNDArray).collect()
img = np.array(arrays)
print(img.shape)
我在一个目录中有 10 张 jpeg 图片。 我想使用 pyspark 同时阅读所有这些内容。 我尝试如下。
from PIL import Image
from pyspark import SparkContext, SparkConf
conf = SparkConf()
spark = SparkContext(conf=conf)
files = glob.glob("E:\tests\*.jpg")
files_ = spark.parallelize(files)
arrs = []
for fi in files_.toLocalIterator():
im = Image.open(fi)
data = np.asarray(im)
arrs.append(data)
img = np.array(arrs)
print (img.shape)
代码无错结束,打印出来img.shape
;但是,它并没有 运行 并行。
你能帮帮我吗?
您可以使用 rdd.map to load and transform the pictures in parallel and then collect rdd 到 Python 列表中:
files = glob.glob("E:\tests\*.jpg")
file_rdd = spark.parallelize(files)
def image_to_array(path):
im = Image.open(path)
data = np.asarray(im)
return data
array_rdd = file_rdd.map(lambda f: image_to_array(f))
result_list = array_rdd.collect()
result_list
现在是一个包含 10 个元素的列表,每个元素是一个 numpy.ndarray
.
函数image_to_array
将在不同的Spark执行器上并行执行。如果你有一个多节点的 Spark 集群,你必须确保所有节点都可以访问 E:\tests\
.
收集数组后,可以继续处理
img = np.array(result_list, dtype=object)
我的解决方案遵循与 werner 相同的想法,但我只使用了 spark 库:
from pyspark.ml.image import ImageSchema
import numpy as np
df = (spark
.read
.format("image")
.option("pathGlobFilter", "*.jpg")
.load("your_data_path"))
df = df.select('image.*')
# Pre-caching the required schema. If you remove this line an error will be raised.
ImageSchema.imageFields
# Transforming images to np.array
arrays = df.rdd.map(ImageSchema.toNDArray).collect()
img = np.array(arrays)
print(img.shape)