如何与 python 并行处理输入,但没有进程?

How to process input in parallel with python, but without processes?

我有一个输入数据列表,想并行处理它,但由于涉及网络 io,处理每个数据都需要时间。 CPU使用不是问题。

我不希望有额外进程的开销,因为我一次有很多事情要处理并且不想设置进程间通信。

# the parallel execution equivalent of this?
import time
input_data = [1,2,3,4,5,6,7]
input_processor = time.sleep
results = map(input_processor, input_data)

我使用的代码利用了twisted.internet.defer,所以涉及它的解决方案也很好。

您可以使用multiprocessing模块。在不知道更多关于你希望它如何处理的情况下,你可以使用一个工人池:

import multiprocessing as mp
import time
input_processor = time.sleep
core_num = mp.cpu_count()
pool=Pool(processes = core_num)
result = [pool.apply_async(input_processor(i)) for for i in range(1,7+1) ]
result_final = [p.get() for p in results]
for n in range(1,7+1):
   print n, result_final[n]

以上记录了每项任务的完成顺序。它还不允许进程相互交谈。

已编辑: 要将其作为函数调用,您应该输入输入数据和处理器数量:

def parallel_map(processor_count, input_data):
  pool=Pool(processes = processor_count)
  result = [pool.apply_async(input_processor(i)) for for i in input_data ]
  result_final = np.array([p.get() for p in results])
  result_data = np.vstack( (input_data, result_final))
  return result_data

您可以轻松定义 Worker 个并行工作的线程,直到队列为空。

from threading import Thread
from collections import deque
import time


# Create a new class that inherits from Thread
class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]
    queue = deque(data)
    result = deque()

    # create 3 workers working on the same input
    workers = [Worker(queue, result, test) for _ in range(3)]

    # start the workers
    for worker in workers:
        worker.start()

    # wait till all workers are finished
    for worker in workers:
        worker.join()

    print(result)

正如预期的那样,这大约运行。 4 秒。

也可以写一个简单的 Pool class 来消除 main 函数中的噪音:

from threading import Thread
from collections import deque
import time


class Pool():

    def __init__(self, n_threads):
        self.n_threads = n_threads

    def map(self, func, data):
        inqueue = deque(data)
        result = deque()

        workers = [Worker(inqueue, result, func) for i in range(self.n_threads)]

        for worker in workers:
            worker.start()

        for worker in workers:
            worker.join()

        return list(result)


class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]

    pool = Pool(6)
    result = pool.map(test, data)

    print(result)

我假设您使用的是 Twisted。在这种情况下,您可以启动多个延迟并使用 DeferredList 等待所有延迟完成:

http://twistedmatrix.com/documents/15.4.0/core/howto/defer.html#deferredlist

如果 input_processor 是非阻塞调用(returns 延迟):

def main():
    input_data = [1,2,3,4,5,6,7]
    input_processor = asyn_function

    for entry in input_data:
        requests.append(defer.maybeDeferred(input_processor, entry))
    deferredList = defer.DeferredList(requests, , consumeErrors=True)
    deferredList.addCallback(gotResults)
    return deferredList


def gotResults(results):
    for (success, value) in result:
        if success:
            print 'Success:', value
        else:
            print 'Failure:', value.getErrorMessage()

如果 input_processor 是一个 long/blocking 函数,您可以使用 deferToThread 而不是 maybeDeferred:

def main():
    input_data = [1,2,3,4,5,6,7]
    input_processor = syn_function

    for entry in input_data:
        requests.append(threads.deferToThread(input_processor, entry))
    deferredList = defer.DeferredList(requests, , consumeErrors=True)
    deferredList.addCallback(gotResults)
    return deferredList