基于值的线程锁

value based thread lock

如果之前有人问过这个问题,请原谅。我环顾四周,但我觉得我没有合适的词汇来通过网络搜索找到它。

我在 python 中有一个多线程应用程序。我希望能够锁定某个代码块,但仅限于具有特定条件的其他线程。我举个例子:有3个线程,thread_athread_bthread_c。每个线程可以随时通过函数 foo 运行。我不希望任何两个 bar 彼此相等的线程能够同时访问 Code block ALPHA。但是,我不想阻止 bar 值不同的线程。在这种情况下,假设 thread_a 有一个 bar == "cat" 并首先点击行 (3)。在 thread_a 命中行 (5) 之前,假设 thread_bbar == "cat" 命中行 (3)。我想让 thread_b 等待。但是如果 thread_cbar == "dog" 一起出现,我希望它能够继续下去。

(1) def foo(bar):
(2)    
(3)     lock(bar)
(4)     # Code block ALPHA (two threads with equivalent bar should not be in here)
(5)     unlock(bar)

另外请注意,bar 的可能值是完全不可预测的,但发生冲突的可能性非常高。

感谢您的帮助。我正在查看的图书馆是 python threading library

已更新

好消息:我能够通过我拼凑的有点粗糙的测试平台使用我的原始答案重现您遇到的 release_lock 问题,并使用计数机制解决问题(如您所建议的那样)- 在至少就我的测试设备而言。

现在使用两个单独的共享字典,一个像以前一样跟踪 "names" 或与每个锁关联的值,另一个跟踪在给定时间有多少线程正在使用每个锁.

和以前一样,锁名称必须是可散列的值,以便它们可以用作字典中的键。

import threading

namespace_lock = threading.Lock()
namespace = {}
counters = {}

def aquire_lock(value):
    with namespace_lock:
        if value in namespace:
            counters[value] += 1
        else:
            namespace[value] = threading.Lock()
            counters[value] = 1

    namespace[value].acquire()

def release_lock(value):
    with namespace_lock:
        if counters[value] == 1:
            del counters[value]
            lock = namespace.pop(value)
        else:
            counters[value] -= 1
            lock = namespace[value]

    lock.release()

# sample usage    
def foo(bar):
    aquire_lock(bar)
    # Code block ALPHA (two threads with equivalent bar should not be in here)
    release_lock(bar)

有一个锁,每当线程试图进入或退出临界区时获取,并为 bar 的每个值使用单独的条件变量。可能可以优化以下内容以创建更少的条件变量,但这样做 post 感觉像是过早的优化:

import collections
import contextlib
import threading

lock = threading.Lock()

wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock)))

@contextlib.contextmanager
def critical(bar):
    with lock:
        busy, waiters, condition = wait_tracker[bar]
        if busy:
            # Someone with the same bar value is in the critical section.

            # Record that we're waiting.
            waiters += 1
            wait_tracker[bar] = busy, waiters, condition

            # Wait for our turn.
            while wait_tracker[bar][0]:
                condition.wait()

            # Record that we're not waiting any more.
            busy, waiters, condition = wait_tracker[bar]
            waiters -= 1

        # Record that we're entering the critical section.
        busy = True
        wait_tracker[bar] = busy, waiters, condition
    try:
        # Critical section runs here.
        yield
    finally:
        with lock:
            # Record that we're out of the critical section.
            busy, waiters, condition = wait_tracker[bar]
            busy = False
            if waiters:
                # Someone was waiting for us. Tell them it's their turn now.
                wait_tracker[bar] = busy, waiters, condition
                condition.notify()
            else:
                # No one was waiting for us. Clean up a bit so the wait_tracker
                # doesn't grow forever.
                del wait_tracker[bar]

然后每个想要进入临界区的线程执行以下操作:

with critical(bar):
    # Critical section.

此代码未经测试,并行性很难,尤其是锁和共享内存并行性。我不保证它会起作用。

这里有一个面向 class 的解决方案,适用于需要多个单独的锁组的情况。

# A dynamic group of locks, useful for parameter based locking.
class LockGroup(object):

    def __init__(self):
        self.lock_dict = {}
        self.lock = threading.Lock()

    # Returns a lock object, unique for each unique value of param.
    # The first call with a given value of param creates a new lock, subsequent
    # calls return the same lock.
    def get_lock(self, param):
        with self.lock:
            if param not in self.lock_dict:
                self.lock_dict[param] = threading.Lock()
            return self.lock_dict[param]