创建一个 class 支持 json 序列化以用于 Celery

Create a class that support json serialization for use with Celery

我正在使用 Celery 运行 一些后台任务。我创建的任务之一 returns a python class。考虑到有关使用 pickle 的警告,我想使用 json 序列化和反序列化此 class。

是否有一种简单的内置方法可以实现此目的?

class非常简单,它包含3个属性,都是命名元组列表。它包含几个对属性执行一些计算的方法。

我的想法是serialize/deserialize这3个属性,因为那定义了class。

这是我对编码器的想法,但我不确定如何再次解码数据?

import json

class JSONSerializable(object):
    def __repr__(self):
        return json.dumps(self.__dict__)

class MySimpleClass(JSONSerializable):
    def __init__(self, p1, p2, p3): # I only care about p1, p2, p3
        self.p1 = p1
        self.p2 = p2
        self.p3 = p2
        self.abc = p1 + p2 + p2

    def some_calc(self):
        ...

首先但同样重要的是:针对泡菜的警告主要是如果您可以让第 3 部分在您的工作流中注入泡菜数据。如果您确定您自己的系统正在创建所有要使用的 pickle 数据,则根本没有安全问题。至于兼容性,它相对容易处理,如果你的 Pickle 文件的生产者和消费者使用相同的 Python 版本,它是自动的。

也就是说,对于 JSON,您必须创建 Python's json.JSONEncoder and json.JSONDecoder 的子 class - 每个都需要作为 cls 参数传递给您所有的 json.dump(s)json.load(s) 通话。

一个建议是编码器上的 default 方法编码 class __module__,它的 __name__ 和一个标识符键,比如说 __custom__ 到确保它应该被自定义解码,作为字典的键,对象的数据作为 "data" 键。

并且在编码器上,您检查 __custom__ 键,然后它们使用 __new__ 方法实例化一个 class,并填充其字典。与 pickle 一样,在 class __init__ 上触发的副作用不会 运行。

您稍后可以增强解码器和编码器,例如,它们可以在 class 中搜索只能处理所需属性的 __json_encode__ 方法。

实施示例:

import json

class GenericJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super().default(obj)
        except TypeError:
            pass
        cls = type(obj)
        result = {
            '__custom__': True,
            '__module__': cls.__module__,
            '__name__': cls.__name__,
            'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
        }
        return result


class GenericJSONDecoder(json.JSONDecoder):
    def decode(self, str):
        result = super().decode(str)
        if not isinstance(result, dict) or not result.get('__custom__', False):
            return result
        import sys
        module = result['__module__']
        if not module in sys.modules:
            __import__(module)
        cls = getattr(sys.modules[module], result['__name__'])
        if hasattr(cls, '__json_decode__'):
            return cls.__json_decode__(result['data'])
        instance = cls.__new__(cls)
        instance.__dict__.update(result['data'])
        return instance

控制台交互测试:

In [36]: class A:
    ...:     def __init__(self, a):
    ...:         self.a = a
    ...:         

In [37]: a = A('test')

In [38]: b = json.loads(json.dumps(a, cls=GenericJSONEncoder),  cls=GenericJSONDecoder)

In [39]: b.a
Out[39]: 'test'

这是@jsbueno 提供的出色解决方案的改进版本,它也适用于嵌套的自定义类型。

import json
import collections
import six

def is_iterable(arg):
    return isinstance(arg, collections.Iterable) and not isinstance(arg, six.string_types)


class GenericJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super().default(obj)
        except TypeError:
            pass
        cls = type(obj)
        result = {
            '__custom__': True,
            '__module__': cls.__module__,
            '__name__': cls.__name__,
            'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
        }
        return result


class GenericJSONDecoder(json.JSONDecoder):
    def decode(self, str):
        result = super().decode(str)
        return GenericJSONDecoder.instantiate_object(result)

    @staticmethod
    def instantiate_object(result):
        if not isinstance(result, dict):  # or
            if is_iterable(result):
                return [GenericJSONDecoder.instantiate_object(v) for v in result]
            else:
                return result

        if not result.get('__custom__', False):
            return {k: GenericJSONDecoder.instantiate_object(v) for k, v in result.items()}

        import sys
        module = result['__module__']
        if module not in sys.modules:
            __import__(module)
        cls = getattr(sys.modules[module], result['__name__'])
        if hasattr(cls, '__json_decode__'):
            return cls.__json_decode__(result['data'])
        instance = cls.__new__(cls)
        data = {k: GenericJSONDecoder.instantiate_object(v) for k, v in result['data'].items()}
        instance.__dict__.update(data)
        return instance


class C:

    def __init__(self):
        self.c = 133

    def __repr__(self):
        return "C<" + str(self.__dict__) + ">"


class B:

    def __init__(self):
        self.b = {'int': 123, "c": C()}
        self.l = [123, C()]
        self.t = (234, C())
        self.s = "Blah"

    def __repr__(self):
        return "B<" + str(self.__dict__) + ">"


class A:
    class_y = 13

    def __init__(self):
        self.x = B()

    def __repr__(self):
        return "A<" + str(self.__dict__) + ">"


def dumps(obj, *args, **kwargs):
    return json.dumps(obj, *args, cls=GenericJSONEncoder, **kwargs)


def dump(obj, *args, **kwargs):
    return json.dump(obj, *args, cls=GenericJSONEncoder, **kwargs)


def loads(obj, *args, **kwargs):
    return json.loads(obj, *args, cls=GenericJSONDecoder, **kwargs)


def load(obj, *args, **kwargs):
    return json.load(obj, *args, cls=GenericJSONDecoder, **kwargs)

查看:

e = dumps(A())
print("ENCODED:\n\n", e)
b = json.loads(e, cls=GenericJSONDecoder)
b = loads(e)
print("\nDECODED:\n\n", b)

打印:

 A<{'x': B<{'b': {'int': 123, 'c': C<{'c': 133}>}, 'l': [123, C<{'c': 133}>], 't': [234, C<{'c': 133}>], 's': 'Blah'}>}>

原始版本仅正确重构了 ABC 的所有实例均未实例化,而是保留为字典:

A<{'x': {'__custom__': True, '__module__': '__main__', '__name__': 'B', 'data': {'b': {'int': 123, 'c': {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}}, 'l': [123, {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}], 't': [234, {'__custom__': True, '__module__': '__main__', '__name__': 'C', 'data': {'c': 133}}], 's': 'Blah'}}}>

请注意,如果类型包含列表或元组之类的集合,则在解码期间无法恢复集合的实际类型。这是因为当编码为 json.

时,所有这些集合都将转换为列表