池到芹菜组
Pool to Celery group
我正在使用 Celery 启动一个任务,这个任务包括一个使用多处理和 ThreadPool 构建的端口扫描器。当我启动它时出现以下错误,寻找替代方法或教程来迁移我现有的代码:
[2015-10-25 18:53:57,090: ERROR/Worker-3] daemonic processes are not allowed to have children
Traceback (most recent call last):
File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 57, in is_port_opened
ports = list(scan_ports(server=server, port=int(port)))
File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 37, in scan_ports
p = Pool(NUM_CORES)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
w.start()
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
这是我的代码:
import socket
from functools import partial
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from errno import ECONNREFUSED
NUM_CORES = 4
def portscan(target, port):
try:
# Create Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(12)
s.connect((target, port))
print 'port_scanner.is_port_opened() ' + str(port) + " is opened"
return port
except socket.error as err:
print str(err)
if err.errno == ECONNREFUSED:
return False
# Wrapper function that calls portscanner
def scan_ports(server=None, port=None, portStart=None, portEnd=None, **kwargs):
p = Pool(NUM_CORES)
ping_host = partial(portscan, server)
if portStart and portStart:
# Filter returns Booleans
return filter(bool, p.map(ping_host, range(portStart, portStart)))
else:
return filter(bool, p.map(ping_host, range(port, port + 1)))
# Check if port is opened
def is_port_opened(server=None, port=None, **kwargs):
try:
# check if its IP Address:
ports = list(scan_ports(server=server, port=int(port)))
if len(ports) != 0:
print 'port_scanner.is_port_opened() ' + str(len(ports)) + " port(s) available."
return True
else:
print 'port_scanner.is_port_opened() port not opened: (' + str(port) + ')'
return False
except Exception, e:
print str(e)
if __name__ == '__main__':
is_port_opened(server='110.10.0.144', port=22)
您的问题与 中遇到的问题非常接近。
守护进程不能生成子进程。其背后的原因是因为守护进程被其父进程强行终止。但是,如果父进程也是一个守护进程,它将被强行终止,而没有时间对其子进程执行相同的操作。
multiprocessing.Pool
使用守护进程来确保它们在程序退出时不会泄漏。我假设 Celery 对 运行 你的任务做同样的事情。因此,您在守护进程中生成守护进程。
一个可能的解决方案是使用 multiprocessing 库中的 Process
和 Queue
对象自己快速实现一个进程池。您将生成非守护进程并通过队列提供它们。
我正在使用 Celery 启动一个任务,这个任务包括一个使用多处理和 ThreadPool 构建的端口扫描器。当我启动它时出现以下错误,寻找替代方法或教程来迁移我现有的代码:
[2015-10-25 18:53:57,090: ERROR/Worker-3] daemonic processes are not allowed to have children
Traceback (most recent call last):
File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 57, in is_port_opened
ports = list(scan_ports(server=server, port=int(port)))
File "/Users/user/Documents/OpenSource/Development/Python/test/utils/port_scanner.py", line 37, in scan_ports
p = Pool(NUM_CORES)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
w.start()
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
这是我的代码:
import socket
from functools import partial
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from errno import ECONNREFUSED
NUM_CORES = 4
def portscan(target, port):
try:
# Create Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(12)
s.connect((target, port))
print 'port_scanner.is_port_opened() ' + str(port) + " is opened"
return port
except socket.error as err:
print str(err)
if err.errno == ECONNREFUSED:
return False
# Wrapper function that calls portscanner
def scan_ports(server=None, port=None, portStart=None, portEnd=None, **kwargs):
p = Pool(NUM_CORES)
ping_host = partial(portscan, server)
if portStart and portStart:
# Filter returns Booleans
return filter(bool, p.map(ping_host, range(portStart, portStart)))
else:
return filter(bool, p.map(ping_host, range(port, port + 1)))
# Check if port is opened
def is_port_opened(server=None, port=None, **kwargs):
try:
# check if its IP Address:
ports = list(scan_ports(server=server, port=int(port)))
if len(ports) != 0:
print 'port_scanner.is_port_opened() ' + str(len(ports)) + " port(s) available."
return True
else:
print 'port_scanner.is_port_opened() port not opened: (' + str(port) + ')'
return False
except Exception, e:
print str(e)
if __name__ == '__main__':
is_port_opened(server='110.10.0.144', port=22)
您的问题与
multiprocessing.Pool
使用守护进程来确保它们在程序退出时不会泄漏。我假设 Celery 对 运行 你的任务做同样的事情。因此,您在守护进程中生成守护进程。
一个可能的解决方案是使用 multiprocessing 库中的 Process
和 Queue
对象自己快速实现一个进程池。您将生成非守护进程并通过队列提供它们。