如何检查 multiprocessing.Lock() 是否被锁定?

How to check if a multiprocessing.Lock() is locked?

我正在尝试制作一个可以被多个进程访问的共享资源,但它可以同时处理的请求数量有限。在最终产品中,此共享资源基于 API 进行数据请求。

因为我要提出很多请求,所以我想建立多个单独的连接来加快速度。在这种情况下,我想同时建立三个连接并在这些连接上划分请求。

信号量似乎起作用并将同时函数调用的数量限制为三个。但是,我似乎无法将三个调用的负载分散到不同的函数上,因为 multiprocessing.Lock 不支持 locked() 方法,就像它的线程等效方法一样。

我想知道是否有人可以帮助我将这三个调用分散到不同的启动会话中,或者对如何以不同的方式解决这个问题有很好的建议?

非常非常感谢!

import time

import multiprocessing
import concurrent.futures

from multiprocessing.managers import BaseManager

class Tester:
    def __init__(self, sessions=3):
        self.semaphore = multiprocessing.BoundedSemaphore(sessions)

        self.locka = multiprocessing.Lock()
        self.lockb = multiprocessing.Lock()
        self.lockc = multiprocessing.Lock()

    def call(self, name):
        with self.semaphore:

            while True:
                if not self.locka.locked():
                    with self.locka:
                        time.sleep(1)
                        return self.session_a(name)

                if not self.lockb.locked():    
                    with self.lockb:
                        time.sleep(1)
                        return self.session_b(name)

                if not self.lockc.locked():
                    with self.lockc:
                        time.sleep(1)
                        return self.session_c(name) 

    def session_a(self, name):
        print(f'session_a:  {name}')

    def session_b(self, name):
        print(f'session_b:  {name}')

    def session_c(self):
        print(f'session_c:  {name}')


def call_object(obj, name):
    obj.call(name)


def main():
    BaseManager.register('Tester', Tester)
    manager = BaseManager()

    manager.start()
    inst = manager.Tester()

    with concurrent.futures.ProcessPoolExecutor() as executor:
        names = ['Alex', 'Brain', 'Carl', 'Derek', 'Eric', 'Frank', 'George', 'Harold']
        futures = [executor.submit(call_object, inst, name) for name in names]


if __name__ == "__main__":
    main()

仅使用锁并且:

  • 循环 locks/sessions 直到获得锁,然后
  • 得到结果
  • return结果

尝试获取锁时不要阻塞,这样您就可以尝试下一个锁 - 它会 return 如果获取到,则为 True,如果未获取,则为 False。


import time
import multiprocessing
import concurrent.futures

from multiprocessing.managers import BaseManager

class Tester:
    def __init__(self, sessions=3):
        self.locks = [multiprocessing.Lock(),
                      multiprocessing.Lock(),
                      multiprocessing.Lock()]

    def call(self, name):
        # cycle through locks/sessions till a lock is acquired and
        # a result is obtained
        # return the result
        done = False
        while not done:
            for lock,session in zip(self.locks,
                                    [self.session_a,
                                     self.session_b,
                                     self.session_c]):
                acq = lock.acquire(block=False)
                if acq:
                    #print(f'lock acquired for {session.__name__}:{name}',file=sys.stdout)
                    try:
                        time.sleep(1)
                        result = session(name)
                    finally:
                        lock.release()
                        #print(f'lock for {session.__name__} released')
                        done = True
                    #print(result)
                    break
        return result + '!!'


    def session_a(self, name):
        #print(f'in method session_a:  {name}')
        return f'session_a:  {name}'

    def session_b(self, name):
        #print(f'in method session_b:  {name}')
        return f'session_b:  {name}'

    def session_c(self,name):
        #print(f'in method session_c:  {name}')
        return f'session_c:  {name}'

def call_object(obj, name):
    return obj.call(name)

def main():
    BaseManager.register('Tester', Tester)
    manager = BaseManager()

    manager.start()
    inst = manager.Tester()

    with concurrent.futures.ProcessPoolExecutor() as executor:
        names = ['Alex', 'Brain', 'Carl', 'Derek', 'Eric', 'Frank', 'George', 'Harold']
        futures = [executor.submit(call_object, inst, name) for name in names]
        for fut in concurrent.futures.as_completed(futures):
            print(f'future result {fut.result()}')