将位数组库导入 SparkContext

Importing bitarray library into SparkContext

我正在尝试将位数组库导入 SparkContext。 https://pypi.python.org/pypi/bitarray/0.8.1.

为此,我将上下文压缩到位数组文件夹中,然后尝试将其添加到我的 python 文件中。但是,即使我将库推送到节点后,我的 RDD 也找不到该库。这是我的代码

zip bitarray.zip bitarray-0.8.1/bitarray/*

// Check the contents of the zip file 

unzip -l bitarray.zip
Archive:  bitarray.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
   143455  2015-11-06 02:07   bitarray/_bitarray.so
     4440  2015-11-06 02:06   bitarray/__init__.py
     6224  2015-11-06 02:07   bitarray/__init__.pyc
    68516  2015-11-06 02:06   bitarray/test_bitarray.py
    78976  2015-11-06 02:07   bitarray/test_bitarray.pyc
---------                     -------
   301611                     5 files

然后在 spark

import os 

# Environment
import findspark
findspark.init("/home/utils/spark-1.6.0/")

import pyspark
sparkConf = pyspark.SparkConf()

sparkConf.set("spark.executor.instances", "2") 
sparkConf.set("spark.executor.memory", "10g")
sparkConf.set("spark.executor.cores", "2")

sc = pyspark.SparkContext(conf = sparkConf)

from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import udf

hiveContext = HiveContext(sc)

PYBLOOM_LIB = '/home/ryandevera/pybloom.zip'
sys.path.append(PYBLOOM_LIB)
sc.addPyFile(PYBLOOM_LIB)

from pybloom import BloomFilter
f = BloomFilter(capacity=1000, error_rate=0.001)
x = sc.parallelize([(1,("hello",4)),(2,("goodbye",5)),(3,("hey",6)),(4,("test",7))],2)


def bloom_filter_spark(iterator):
    for id,_ in iterator:
        f.add(id)
    yield (None, f)

x.mapPartitions(bloom_filter_spark).take(1)

这会产生错误 -

ImportError: pybloom requires bitarray >= 0.3.4

我不确定我哪里出错了。任何帮助将不胜感激!

可能最简单的事情就是创建和分发 egg 文件。假设您已经从 PyPI 下载并解压源文件并设置 PYBLOOM_SOURCE_DIRBITARRAY_SOURCE_DIR 变量:

cd $PYBLOOM_SOURCE_DIR
python setup.py bdist_egg
cd $BITARRAY_SOURCE_DIR
python setup.py bdist_egg

在 PySpark 中添加:

from itertools import chain
import os
import glob

eggs = chain.from_iterable([
  glob.glob(os.path.join(os.environ[x], "dist/*")) for x in   
  ["PYBLOOM_SOURCE_DIR", "BITARRAY_SOURCE_DIR"]
])

for egg in eggs: sc.addPyFile(egg)

问题是 BloomFilter 对象无法正确序列化,因此如果您想使用它,您必须修补它或提取 bitarrays 并传递它们:

def buildFilter(iter):
    bf = BloomFilter(capacity=1000, error_rate=0.001)
    for x in iter:
        bf.add(x)
    return [bf.bitarray]

rdd = sc.parallelize(range(100))
rdd.mapPartitions(buildFilter).reduce(lambda x, y: x | y)