如何使用 PySpark 和 Jupyter 分发 类

How to distibute classes with PySpark and Jupyter

我在使用带有 spark 的 jupyter notebook 时遇到了一个烦人的问题。

我需要在notebook里面定义一个自定义的class,并用它来进行一些地图操作

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SQLContext

conf = SparkConf().setMaster("spark://192.168.10.11:7077")\
              .setAppName("app_jupyter/")\
              .set("spark.cores.max", "10")

sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

class demo(object):
    def __init__(self, value):
        self.test = value + 10
        pass

distData.map(lambda x : demo(x)).collect()

它给出了以下错误:

PicklingError: Can't pickle : attribute lookup main.demo failed

我知道这个错误是怎么回事,但我想不出解决办法..

我试过:

  1. 在笔记本外定义一个demo.pypython文件。它有效,但它是一个丑陋的解决方案......
  2. 创建一个动态模块like this,然后导入它...这给出了同样的错误

什么是解决方案?...我希望所有内容都在同一个笔记本中工作

可以更改以下内容:

  1. spark 的工作方式,可能是一些 pickle 配置
  2. 代码中的某些内容...使用一些静态魔术方法

此处没有可靠且优雅的解决方法,并且此行为与 Spark 没有特别相关。 This is all about fundamental design of the Python pickle

pickle can save and restore class instances transparently, however the class definition must be importable and live in the same module as when the object was stored.

理论上你可以定义一个 custom cell magic 它将:

  • 将单元格的内容写入模块。
  • 导入它。
  • 调用SparkContext.addPyFile分发模块。
from IPython.core.magic import register_cell_magic
import importlib

@register_cell_magic
def spark_class(line, cell):
    module = line.strip()
    f = "{0}.py".format(module)

    with open(f, "w") as fw:
        fw.write(cell)

    globals()[module] = importlib.import_module(module)
    sc.addPyFile(f)
In [2]: %%spark_class foo
   ...: class Foo(object):
   ...:     def __init__(self, x):
   ...:         self.x = x
   ...:     def __repr__(self):
   ...:         return "Foo({0})".format(self.x)
   ...: 

In [3]: sc.parallelize([1, 2, 3]).map(lambda x: foo.Foo(x)).collect()
Out[3]: [Foo(1), Foo(2), Foo(3)]       

但这是一次性交易。一旦文件被标记为分发,就不能更改或重新分发。此外还有在远程主机上重新加载导入的问题。我可以想出一些更复杂的方案,但这只是麻烦得不偿失。

答案 是可靠的:没有一种“正确”的方法可以解决这个问题。您确实可以按照建议使用 Jupyter 魔法。另一种方法是使用 Jupyter 的 %%writefile 将您的代码内嵌在 Jupyter 单元格中,然后将其作为 python 文件写入磁盘。然后您既可以将该文件导入到您的 Jupyter 内核会话中,也可以将其与您的 PySpark 作业一起发送(通过 addPyFile(),如另一个答案中所述)。请注意,如果您更改了代码但没有重新启动 PySpark 会话,则必须以某种方式将更新后的代码发送到 PySpark。

我们可以让这更容易吗?我写了一个 blogpost about this topic as well as a PySpark Session wrapper (oarphpy.spark.NBSpark) 来帮助自动化很多棘手的事情。有关工作示例,请参阅嵌入在 post 中的 Jupyter Notebook。整体模式如下所示:

import os
import sys
CUSTOM_LIB_SRC_DIR = '/tmp/'
os.chdir(CUSTOM_LIB_SRC_DIR)

!mkdir -p mymodule
!touch mymodule/__init__.py
%%writefile mymodule/foo.py
class Zebra(object):
    def __init__(self, name):
        self.name = name
sys.path.append(CUSTOM_LIB_SRC_DIR)
from mymodule.foo import Zebra

# Create Zebra() instances in the notebook
herd = [Zebra(name=str(i)) for i in range(10)]
# Now send those instances to PySpark!
from oarphpy.spark import NBSpark

NBSpark.SRC_ROOT = os.path.join(CUSTOM_LIB_SRC_DIR, 'mymodule')
spark = NBSpark.getOrCreate()
rdd = spark.sparkContext.parallelize(herd)
def get_name(z):
    return z.name
names = rdd.map(get_name).collect()

此外,如果您对磁盘上的 mymodule 文件进行了任何更改(通过 %%writefile 或其他方式),则 NBSpark 会自动将这些更改发送到活动的 PySpark 会话。