为 Ray actor 函数实现缓存

Implementing cache for Ray actor function

我的目标是让下面的代码在大约 0.3 秒而不是 0.5 秒内执行。我试过在 foo 上使用 functools.lru_cachetoolz.functoolz.memoizekids.cache.cache 的装饰器,但是其中 none 已经起作用(要么是错误消息,要么是不正确执行)。我该怎么做才能完成这项工作?

import ray


@ray.remote
class Foo:
    def foo(self, x):
        print("executing foo with {}".format(x))
        time.sleep(0.1)


ray.init()
f = Foo.remote()
s = time.time()
ray.get([f.foo.remote(x=i) for i in [1, 2, 1, 4, 1]])
print(time.time()-s)
ray.shutdown()

一般警告:如果函数产生副作用,则缓存任意函数调用可能很危险。

这种情况下,想必你希望程序输出

executing foo with 1 
executing foo with 2 
executing foo with 4 

你提到的那些其他缓存工具往往不能很好地与 Ray 一起工作,因为它们试图将缓存存储在某种全局状态中,并且它们没有将该状态存储在可以访问的地方分布式的方式。由于您已经有一个演员,您可以将全局状态存储在演员中。

@ray.remote
class Foo:
    def __init__(self):
        self.foo_cache = {}

    def foo(self, x):
        def real_foo(x):
            print("executing foo with {}".format(x))
            time.sleep(0.1)
        if x not in self.foo_cache:
            self.foo_cache[x] = real_foo(x)
        return self.foo_cache[x]

这是一种非常通用的缓存技术,这里唯一重要的区别是我们必须将状态存储在 actor 中。

更通用的方法

我们还可以通过定义通用函数缓存将这种方法推广到任何 Ray 函数:

@ray.remote
class FunctionCache:
    def __init__(self, func):
        self.func = ray.remote(func)
        self.cache = {}

    def call(self, *args, **kwargs):
        if (args, kwargs) not in cache:
            cache[(args, kwargs)] = self.func(*args, **kwargs)
        return cache[(args, kwargs)]

然后为了清理我们使用它的方式,我们可以定义一个装饰器:

class RemoteFunctionLookAlike:
    def __init__(self, func):
        self.func = func

    def remote(self, *args, **kwargs):
        return self.func(*args, **kwargs)


def ray_cache(func):
    cache = FunctionCache.remote(func)
    def call_with_cache(*args, **kwargs):
        return cache.call.remote(*args, **kwargs)
    return RayFunctionLookAlike(call_with_cache)

最后,要使用这个缓存:

@ray_cache
def foo(x):
    print("Function called!")
    return abc

ray.get([foo.remote("constant") for _ in range(100)]) # Only prints "Function called!" once.