在多个独立进程中使用 Python RLocks
Using Python RLocks across multiple independent processes
我正在开发一个使用 Celery 来安排一些长期任务的 Django 项目。 Django 和 Celery 运行 在完全独立的进程中,需要一种方法来协调对数据库的访问。我想使用 Python 的 multiprocessing.RLock
class (或等效的),因为我需要锁是可重入的。
我的问题是,如何为单独的进程提供对 RLock 的访问权限?
我发现的两个最佳解决方案 (posix_ipc module and fcntl) 仅限于基于 Unix 的系统,我们希望避免局限于此。
有没有一种跨平台的方法可以在没有共同祖先进程的情况下在进程之间共享锁?
我最终使用 RabbitMQ 作为创建分布式锁的一种方式。有关如何执行此操作的详细信息,请参阅 RabbitMQ 的博客:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/.
简而言之,您为锁创建一个 RabbitMQ 队列并向其发送一条消息。要获取锁,请在队列上 运行 一个 basic_get
(非阻塞)或 basic_consume
(阻塞)。这将从队列中删除消息,防止其他线程获取锁。一旦你的工作完成,发送一个否定的确认将导致 RabbitMQ 重新排队消息,允许下一个线程继续。
不幸的是,这不允许重入锁。
上面引用的 link 给出了 Java 代码,说明如何执行此操作。弄清楚如何将其转换为 Python/Pika 已经够烦人了,我想我应该 post 一些示例代码在这里。
生成锁:
import pika
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
channel = connection.channel()
channel.queue_declare(queue="LockQueue")
channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock')
channel.close()
获取锁:
import pika
import time
def callback(ch, method, properties, body):
print("Got lock")
for i in range(5, 0, -1):
print("Tick {}".format(i))
time.sleep(1)
print("Releasing lock")
ch.basic_nack(delivery_tag=method.delivery_tag)
ch.close() # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock.
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
channel = connection.channel()
channel.queue_declare(queue='LockQueue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='LockQueue')
print("Waiting for lock")
channel.start_consuming()
print("Task completed")
我正在开发一个使用 Celery 来安排一些长期任务的 Django 项目。 Django 和 Celery 运行 在完全独立的进程中,需要一种方法来协调对数据库的访问。我想使用 Python 的 multiprocessing.RLock
class (或等效的),因为我需要锁是可重入的。
我的问题是,如何为单独的进程提供对 RLock 的访问权限?
我发现的两个最佳解决方案 (posix_ipc module and fcntl) 仅限于基于 Unix 的系统,我们希望避免局限于此。
有没有一种跨平台的方法可以在没有共同祖先进程的情况下在进程之间共享锁?
我最终使用 RabbitMQ 作为创建分布式锁的一种方式。有关如何执行此操作的详细信息,请参阅 RabbitMQ 的博客:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/.
简而言之,您为锁创建一个 RabbitMQ 队列并向其发送一条消息。要获取锁,请在队列上 运行 一个 basic_get
(非阻塞)或 basic_consume
(阻塞)。这将从队列中删除消息,防止其他线程获取锁。一旦你的工作完成,发送一个否定的确认将导致 RabbitMQ 重新排队消息,允许下一个线程继续。
不幸的是,这不允许重入锁。
上面引用的 link 给出了 Java 代码,说明如何执行此操作。弄清楚如何将其转换为 Python/Pika 已经够烦人了,我想我应该 post 一些示例代码在这里。
生成锁:
import pika
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
channel = connection.channel()
channel.queue_declare(queue="LockQueue")
channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock')
channel.close()
获取锁:
import pika
import time
def callback(ch, method, properties, body):
print("Got lock")
for i in range(5, 0, -1):
print("Tick {}".format(i))
time.sleep(1)
print("Releasing lock")
ch.basic_nack(delivery_tag=method.delivery_tag)
ch.close() # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock.
with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
channel = connection.channel()
channel.queue_declare(queue='LockQueue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='LockQueue')
print("Waiting for lock")
channel.start_consuming()
print("Task completed")