池到芹菜组

Pool to Celery group

我正在使用 Celery 启动一个任务,这个任务包括一个使用多处理和 ThreadPool 构建的端口扫描器。当我启动它时出现以下错误,寻找替代方法或教程来迁移我现有的代码:

[2015-10-25 18:53:57,090: ERROR/Worker-3] daemonic processes are not allowed to have children
Traceback (most recent call last):
  File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 57, in is_port_opened
    ports = list(scan_ports(server=server, port=int(port)))
  File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 37, in scan_ports
    p = Pool(NUM_CORES)
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
    self._repopulate_pool()
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
    w.start()
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

这是我的代码:

import socket
from functools import partial
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from errno import ECONNREFUSED

NUM_CORES = 4
def portscan(target, port):
    try:
        # Create Socket
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(12)
        s.connect((target, port))
        print 'port_scanner.is_port_opened() ' + str(port) + " is opened"
        return port
    except socket.error as err:
        print str(err)
        if err.errno == ECONNREFUSED:
            return False


# Wrapper function that calls portscanner
def scan_ports(server=None, port=None, portStart=None, portEnd=None, **kwargs):
    p = Pool(NUM_CORES)
    ping_host = partial(portscan, server)

    if portStart and portStart:
        # Filter returns Booleans
        return filter(bool, p.map(ping_host, range(portStart, portStart)))
    else:
        return filter(bool, p.map(ping_host, range(port, port + 1)))


# Check if port is opened
def is_port_opened(server=None, port=None, **kwargs):
    try:
        # check if its IP Address:
        ports = list(scan_ports(server=server, port=int(port)))
        if len(ports) != 0:
            print 'port_scanner.is_port_opened() ' + str(len(ports)) + " port(s) available."
            return True
        else:
            print 'port_scanner.is_port_opened() port not opened: (' + str(port) + ')'
            return False

    except Exception, e:
        print str(e)

if __name__ == '__main__':
    is_port_opened(server='110.10.0.144', port=22)

您的问题与 中遇到的问题非常接近。 守护进程不能生成子进程。其背后的原因是因为守护进程被其父进程强行终止。但是,如果父进程也是一个守护进程,它将被强行终止,而没有时间对其子进程执行相同的操作。

multiprocessing.Pool 使用守护进程来确保它们在程序退出时不会泄漏。我假设 Celery 对 运行 你的任务做同样的事情。因此,您在守护进程中生成守护进程。

一个可能的解决方案是使用 multiprocessing 库中的 ProcessQueue 对象自己快速实现一个进程池。您将生成非守护进程并通过队列提供它们。