将位数组库导入 SparkContext
Importing bitarray library into SparkContext
我正在尝试将位数组库导入 SparkContext。 https://pypi.python.org/pypi/bitarray/0.8.1.
为此,我将上下文压缩到位数组文件夹中,然后尝试将其添加到我的 python 文件中。但是,即使我将库推送到节点后,我的 RDD 也找不到该库。这是我的代码
zip bitarray.zip bitarray-0.8.1/bitarray/*
// Check the contents of the zip file
unzip -l bitarray.zip
Archive: bitarray.zip
Length Date Time Name
--------- ---------- ----- ----
143455 2015-11-06 02:07 bitarray/_bitarray.so
4440 2015-11-06 02:06 bitarray/__init__.py
6224 2015-11-06 02:07 bitarray/__init__.pyc
68516 2015-11-06 02:06 bitarray/test_bitarray.py
78976 2015-11-06 02:07 bitarray/test_bitarray.pyc
--------- -------
301611 5 files
然后在 spark
import os
# Environment
import findspark
findspark.init("/home/utils/spark-1.6.0/")
import pyspark
sparkConf = pyspark.SparkConf()
sparkConf.set("spark.executor.instances", "2")
sparkConf.set("spark.executor.memory", "10g")
sparkConf.set("spark.executor.cores", "2")
sc = pyspark.SparkContext(conf = sparkConf)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import udf
hiveContext = HiveContext(sc)
PYBLOOM_LIB = '/home/ryandevera/pybloom.zip'
sys.path.append(PYBLOOM_LIB)
sc.addPyFile(PYBLOOM_LIB)
from pybloom import BloomFilter
f = BloomFilter(capacity=1000, error_rate=0.001)
x = sc.parallelize([(1,("hello",4)),(2,("goodbye",5)),(3,("hey",6)),(4,("test",7))],2)
def bloom_filter_spark(iterator):
for id,_ in iterator:
f.add(id)
yield (None, f)
x.mapPartitions(bloom_filter_spark).take(1)
这会产生错误 -
ImportError: pybloom requires bitarray >= 0.3.4
我不确定我哪里出错了。任何帮助将不胜感激!
可能最简单的事情就是创建和分发 egg 文件。假设您已经从 PyPI 下载并解压源文件并设置 PYBLOOM_SOURCE_DIR
和 BITARRAY_SOURCE_DIR
变量:
cd $PYBLOOM_SOURCE_DIR
python setup.py bdist_egg
cd $BITARRAY_SOURCE_DIR
python setup.py bdist_egg
在 PySpark 中添加:
from itertools import chain
import os
import glob
eggs = chain.from_iterable([
glob.glob(os.path.join(os.environ[x], "dist/*")) for x in
["PYBLOOM_SOURCE_DIR", "BITARRAY_SOURCE_DIR"]
])
for egg in eggs: sc.addPyFile(egg)
问题是 BloomFilter
对象无法正确序列化,因此如果您想使用它,您必须修补它或提取 bitarrays
并传递它们:
def buildFilter(iter):
bf = BloomFilter(capacity=1000, error_rate=0.001)
for x in iter:
bf.add(x)
return [bf.bitarray]
rdd = sc.parallelize(range(100))
rdd.mapPartitions(buildFilter).reduce(lambda x, y: x | y)
我正在尝试将位数组库导入 SparkContext。 https://pypi.python.org/pypi/bitarray/0.8.1.
为此,我将上下文压缩到位数组文件夹中,然后尝试将其添加到我的 python 文件中。但是,即使我将库推送到节点后,我的 RDD 也找不到该库。这是我的代码
zip bitarray.zip bitarray-0.8.1/bitarray/*
// Check the contents of the zip file
unzip -l bitarray.zip
Archive: bitarray.zip
Length Date Time Name
--------- ---------- ----- ----
143455 2015-11-06 02:07 bitarray/_bitarray.so
4440 2015-11-06 02:06 bitarray/__init__.py
6224 2015-11-06 02:07 bitarray/__init__.pyc
68516 2015-11-06 02:06 bitarray/test_bitarray.py
78976 2015-11-06 02:07 bitarray/test_bitarray.pyc
--------- -------
301611 5 files
然后在 spark
import os
# Environment
import findspark
findspark.init("/home/utils/spark-1.6.0/")
import pyspark
sparkConf = pyspark.SparkConf()
sparkConf.set("spark.executor.instances", "2")
sparkConf.set("spark.executor.memory", "10g")
sparkConf.set("spark.executor.cores", "2")
sc = pyspark.SparkContext(conf = sparkConf)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import udf
hiveContext = HiveContext(sc)
PYBLOOM_LIB = '/home/ryandevera/pybloom.zip'
sys.path.append(PYBLOOM_LIB)
sc.addPyFile(PYBLOOM_LIB)
from pybloom import BloomFilter
f = BloomFilter(capacity=1000, error_rate=0.001)
x = sc.parallelize([(1,("hello",4)),(2,("goodbye",5)),(3,("hey",6)),(4,("test",7))],2)
def bloom_filter_spark(iterator):
for id,_ in iterator:
f.add(id)
yield (None, f)
x.mapPartitions(bloom_filter_spark).take(1)
这会产生错误 -
ImportError: pybloom requires bitarray >= 0.3.4
我不确定我哪里出错了。任何帮助将不胜感激!
可能最简单的事情就是创建和分发 egg 文件。假设您已经从 PyPI 下载并解压源文件并设置 PYBLOOM_SOURCE_DIR
和 BITARRAY_SOURCE_DIR
变量:
cd $PYBLOOM_SOURCE_DIR
python setup.py bdist_egg
cd $BITARRAY_SOURCE_DIR
python setup.py bdist_egg
在 PySpark 中添加:
from itertools import chain
import os
import glob
eggs = chain.from_iterable([
glob.glob(os.path.join(os.environ[x], "dist/*")) for x in
["PYBLOOM_SOURCE_DIR", "BITARRAY_SOURCE_DIR"]
])
for egg in eggs: sc.addPyFile(egg)
问题是 BloomFilter
对象无法正确序列化,因此如果您想使用它,您必须修补它或提取 bitarrays
并传递它们:
def buildFilter(iter):
bf = BloomFilter(capacity=1000, error_rate=0.001)
for x in iter:
bf.add(x)
return [bf.bitarray]
rdd = sc.parallelize(range(100))
rdd.mapPartitions(buildFilter).reduce(lambda x, y: x | y)