如何 return 来自 Process 或 Thread 实例的值?

How to return values from Process- or Thread instances?

所以我想要 运行 一个可以在网络上或直接从我自己的 mysql 数据库中搜索信息的功能。 第一个过程会比较费时间,第二个比较快

考虑到这一点,我创建了一个进程来启动此复合搜索 (find_compound_view)。如果该过程完成得相对较快,则意味着它存在于数据库中,因此我可以立即呈现结果。否则,我将渲染"drax_retrieving_data.html"。

我想出的愚蠢解决方案是 运行 函数两次,一次检查该过程是否需要很长时间,另一次实际获取函数的 return 值。这主要是因为我不知道如何 return 我的 find_compound_view 函数的值。我试过谷歌搜索,但我似乎无法找到如何 return 具体地从 class 过程中获取值。

   p = Process(target=find_compound_view, args=(form,))
        p.start()
        is_running = p.is_alive()
        start_time=time.time()
        while is_running:
            time.sleep(0.05)
            is_running = p.is_alive()
            if time.time() - start_time > 10 :
                print('Timer exceeded, DRAX is retrieving info!',time.time() - start_time)
                return render(request,'drax_internal_dbs/drax_retrieving_data.html')
        compound = find_compound_view(form,use_email=False)

   if compound:
      data=*****
      return  render(request, 'drax_internal_dbs/result.html',data)

您将需要 multiprocessing.Pipemultiprocessing.Queue 将结果发送回您的父进程。如果你只是做 I/0,你应该使用 Thread 而不是 Process,因为它更轻量并且大部分时间会花在等待上。我将向您展示一般情况下如何处理进程和线程。


使用队列处理

多处理队列建立在管道之上,访问与 locks/semaphores 同步。队列是线程和进程安全的,这意味着您可以将一个队列用于多个 producer/consumer-processes 甚至这些进程中的多个线程。在队列中添加第一项也会在调用进程中启动一个供给线程。 multiprocessing.Queue 的额外开销使得在 single-producer/single-consumer 场景中使用管道更可取且性能更高。

以下是使用 multiprocessing.Queue 发送和检索结果的方法:

from multiprocessing import Process, Queue

SENTINEL = 'SENTINEL'

def sim_busy(out_queue, x):
    for _ in range(int(x)):
        assert 1 == 1
    result = x
    out_queue.put(result)
    # If all results are enqueued, send a sentinel-value to let the parent know
    # no more results will come.
    out_queue.put(SENTINEL)


if __name__ == '__main__':

    out_queue = Queue()

    p = Process(target=sim_busy, args=(out_queue, 150e6))  # 150e6 == 150000000.0
    p.start()

    for result in iter(out_queue.get, SENTINEL):  # sentinel breaks the loop
        print(result)

队列作为参数传递给函数,结果在队列上 .put(),父级 get.() 来自队列。 .get() 是一个阻塞调用,执行不会恢复,直到 得到(指定超时参数是可能的)。请注意 sim_busy 在这里所做的工作是 cpu 密集型工作,那时您会选择进程而不是线程。


工艺与管道

对于一对一连接,管道就足够了。设置几乎相同,只是方法的命名不同以及对 Pipe() returns 两个连接对象的调用。在双工模式下,两个对象都是读写端,duplex=False(单工)第一个连接对象是管道的读端,第二个是写端。在这个基本场景中,我们只需要一个单纯形管道:

from multiprocessing import Process, Pipe

SENTINEL = 'SENTINEL'


def sim_busy(write_conn, x):
    for _ in range(int(x)):
        assert 1 == 1
    result = x
    write_conn.send(result)
    # If all results are send, send a sentinel-value to let the parent know
    # no more results will come.
    write_conn.send(SENTINEL)


if __name__ == '__main__':

    # duplex=False because we just need one-way communication in this case.
    read_conn, write_conn = Pipe(duplex=False)

    p = Process(target=sim_busy, args=(write_conn, 150e6))  # 150e6 == 150000000.0
    p.start()

    for result in iter(read_conn.recv, SENTINEL):  # sentinel breaks the loop
        print(result)

线程和队列

为了与线程一起使用,您想切换到 queue.Queuequeue.Queue 建立在 collections.deque 之上,添加了一些锁以使其线程安全。与 multiprocessing 的队列和管道不同,放在 queue.Queue 上的对象不会被 pickle。由于线程共享相同的内存地址-space,不需要序列化内存复制,只传输指针。

from threading import Thread
from queue import Queue
import time

SENTINEL = 'SENTINEL'


def sim_io(out_queue, query):
    time.sleep(1)
    result = query + '_result'
    out_queue.put(result)
    # If all results are enqueued, send a sentinel-value to let the parent know
    # no more results will come.
    out_queue.put(SENTINEL)


if __name__ == '__main__':

    out_queue = Queue()

    p = Thread(target=sim_io, args=(out_queue, 'my_query'))
    p.start()

    for result in iter(out_queue.get, SENTINEL):  # sentinel-value breaks the loop
        print(result)

  • 阅读为什么for result in iter(out_queue.get, SENTINEL): 在可能的情况下,应该优先于 while True...break 设置。
  • 阅读 为什么应该在所有脚本中使用 if __name__ == '__main__':,尤其是在多处理中。
  • 有关 get() 用法的更多信息