在 spark 上使用 python lime 作为 udf

Using python lime as a udf on spark

我想使用 lime's explainer within a udf on pyspark. I've previously trained the tabular explainer, and stored is as a dill model as suggested in link

loaded_explainer = dill.load(open('location_to_explainer','rb'))

def lime_explainer(*cols):
    selected_cols = np.array([value for value in cols])
    exp = loaded_explainer.explain_instance(selected_cols, loaded_model.predict_proba, num_features = 10)
    mapping = exp.as_map()[1]

    return str(mapping)

但这会花费很多时间,因为看起来很多计算都是在驱动程序上进行的。然后我一直在尝试使用 spark broadcast 将解释器广播给执行者。

broadcasted_explainer= sc.broadcast(loaded_explainer)

def lime_explainer(*col):
    selected_cols = np.array([value for value in cols])
    exp = broadcasted_explainer.value.explain_instance(selected_cols, loaded_model.predict_proba, num_features = 10)
    mapping = exp.as_map()[1]

    return str(mapping)        

然而,我运行陷入了一个pickling错误,关于广播。

PicklingError: Can't pickle at 0x7f69fd5680d0>: attribute lookup on lime.discretize failed

有人可以帮忙吗?有没有类似 dill 的东西可以用来代替 spark 中使用的 cloudpickler?

查看this source,看来您别无选择,只能使用提供的pickler。因此,我只能建议您将 dill 嵌套在默认的 pickler 中。不理想,但它可以工作。尝试类似的东西:

broadcasted_explainer = dill.loads(sc.broadcast(dill.dumps(loaded_explainer)).value)

或者您可以尝试调用 Dill extend() method,它应该将 Dill 数据类型添加到默认的 pickle 包调度中。不知道这是否有效,但您可以尝试一下!

您的 location_to_explainer 数据模式是什么?也许转换成 spark 的 dataframe 会更好。

根据dill描述

dill can be used to store python objects to a file, but the primary usage is to send python objects across the network as a byte stream. dill is quite flexible, and allows arbitrary user defined classes and functions to be serialized. Thus dill is not intended to be secure against erroneously or maliciously constructed data. It is left to the user to decide whether the data they unpickle is from a trustworthy source.

When Not To Use pickle

If you want to use data across different programming languages, pickle is not recommended. Its protocol is specific to Python, thus, cross-language compatibility is not guaranteed. The same holds for different versions of Python itself. Unpickling a file that was pickled in a different version of Python may not always work properly, so you have to make sure that you're using the same version and perform an update if necessary. You should also try not to unpickle data from an untrusted source. Malicious code inside the file might be executed upon unpickling.

据此discuss , you can try pysparkling

I don't think this is a dill issue, as I don't think your code is using dill. So, as far as I know, pyspark uses pickle or cloudpickle and not dill. However, if you do want to use dill with pyspark, there is pysparkling (https://pypi.python.org/pypi/pysparkling)... and using it may clear up your serialization issue. What I suggest is that you open a ticket with pyspark or try pysparkling and if it fails, open a ticket there -- and CC me or refer to this issue so I can follow the thread. I'm going to close this... so if I'm incorrect and you are using dill, please feel free to reopen this issue.

阅读更多Reading pyspark pickles locally

我是 dill 作者。我同意@Majaha,并将稍微扩展@Majaha 的回答。在@Majaha 的回答的第一个 link 中,明确指出 Broadcast 实例是硬连线使用 pickle... 所以 dill 到字符串的建议, undill 之后是个好东西

不幸的是,extend 方法可能对您不起作用。在 Broadcast class 中,源使用 CPickledill 无法扩展。 如果您查看源代码,它使用 import CPickle as pickle; ... pickle.dumps 作为 python 2,使用 import pickle; ... pickle.dumps 作为 python 3。如果它使用 import pickle; ... pickle.dumps 作为 python 2,并且 import pickle; ... pickle._dumps 用于 python 3,然后 dill 可以通过执行 import dill 来扩展 pickler。例如:

Python 3.6.6 (default, Jun 28 2018, 05:53:46) 
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from pickle import _dumps
>>> import dill
>>> _dumps(lambda x:x)
b'\x80\x03cdill._dill\n_create_function\nq\x00(cdill._dill\n_load_type\nq\x01X\x08\x00\x00\x00CodeTypeq\x02\x85q\x03Rq\x04(K\x01K\x00K\x01K\x01KCC\x04|\x00S\x00q\x05N\x85q\x06)X\x01\x00\x00\x00xq\x07\x85q\x08X\x07\x00\x00\x00<stdin>q\tX\x08\x00\x00\x00<lambda>q\nK\x01C\x00q\x0b))tq\x0cRq\rc__main__\n__dict__\nh\nNN}q\x0etq\x0fRq\x10.'

因此,您可以按照@Majaha 的建议进行操作(并预订对 broadcast 的调用),或者您可以修补代码以进行我上面概述的替换(在需要的地方,但是呃.. .),或者您可以使用 dill:

创建自己的派生 class
>>> from pyspark.broadcast import Broadcast as _Broadcast
>>>
>>> class Broadcast(_Broadcast):
...   def dump(self, value, f):
...     try:
...       import dill
...       dill.dump(value, f, pickle_protocol)
...     ...[INSERT THE REST OF THE DUMP METHOD HERE]...

如果上述方法失败...您仍然可以通过查明序列化失败发生的位置来使其正常工作(dill.detect.trace 可以帮助您解决这个问题)。

如果您要建议 pyspark 使用 dill... 一个可能更好的建议是允许用户动态替换序列化程序。 mpi4py 和其他一些软件包就是这样做的。