多处理超时?

Timeout for multiprocessing?

此代码对各种机器执行 ping 操作。您能否帮我更改此代码,以便如果 ping 进程挂起超过 7 秒,它会关闭并 returns 一些标志?

(我想从使用 WMI 的机器中提取各种数据。为此,我会将 ping 功能更改为其他功能。问题是在某些机器上 WMI 已损坏,并且提取数据的过程无限期挂起。超时是需要。)

import multiprocessing.dummy
import subprocess
import numpy as np
import time

start_time = time.time()

def ping(ipadd):
    try:
        response = subprocess.check_output(['ping', ipadd])
        return True
    except subprocess.CalledProcessError as e:
        return False
#print(ping('10.25.59.20'))
machine_names = \
'''
ya.ru
microsoft.com
www.google.com
www.amazon.com
www.nasa.com
'''.split()

np_machine_names = np.array(machine_names)
p = multiprocessing.dummy.Pool(7)
ping_status = p.map(ping, machine_names)
np_ping_status = np.fromiter(ping_status, dtype=bool)
print(*np_machine_names[np_ping_status], sep = '\n')


run_time = time.time() - start_time
print(f'Runtime: {run_time:.0f}')

更新: 虽然我很欣赏关于为子进程添加超时的提示,但问题仍然存在。如何关闭挂起的功能?假设我已经将 ping 更改为从一台机器中提取 WMI 数据(这个从 Windows 机器中提取已安装软件的列表)。没有设置定时器的子进程:

#pip install pypiwin32
import win32com.client 
strComputer = "." 
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator") 
objSWbemServices = objWMIService.ConnectServer(strComputer,"root\cimv2") 
colItems = objSWbemServices.ExecQuery("Select * from Win32_Product") 
for objItem in colItems: 
    print( "Caption: ", objItem.Caption )

使用 asyncio 它自 3.5.4

起在 python 中可用

https://docs.python.org/3/library/asyncio-task.html

Popen 是使用 subprocess 模块时的默认选择。它允许您创建一个进程,然后在指定的超时时间内读取其标准输出和标准错误:

def ping(ipadd):
    process = subprocess.Popen(['ping', ipadd])
    try:
        response, stderr_response = process.communicate(timeout=10)
        return True
    except subprocess.TimeoutExpired:
        return False
    finally:
        process.kill()

此外,请注意 linux 或 osx 上的 ping 可能永远不会退出并继续 ping,因此这将在这些操作系统上 return 错误:

>>> ping('127.0.0.1')
PING 127.0.0.1 (127.0.0.1): 56 data bytes
64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.058 ms
64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.033 ms
...
64 bytes from 127.0.0.1: icmp_seq=8 ttl=64 time=0.064 ms
64 bytes from 127.0.0.1: icmp_seq=9 ttl=64 time=0.031 ms
False

有几种方法可以解决长时间 运行 执行的问题。每种方式都有其优点和缺点。

API

如前所述,最简单的方法是依赖 API 超时。 subprocesssocketrequests 等模块...在其 API 中公开超时参数。

只要可行,这是更可取的方法。

线程数

long-running/hanging 逻辑在单独的线程中执行。主循环可以不受干扰地继续执行并忽略挂起执行。

import threading

TIMEOUT = 60

def hanging_function():
    hang_here()

thread = threading.Tread(target=hanging_function)
thread.daemon = True
thread.start()

thread.join(TIMEOUT)
if thread.is_alive():
    print("Function is hanging!")

这种方法的一个问题是挂起的线程将继续在后台执行消耗资源。

另一个限制是由于线程共享内存。如果您的函数碰巧严重崩溃,它也可能会影响您的主要执行。

进程

我最喜欢的方法是使用 multiprocessing 工具在单独的进程中执行有问题的逻辑。由于进程不共享内存,因此有问题的函数中发生的任何事情都仅限于您可以随时终止的子进程。

import multiprocessing

TIMEOUT = 60

def hanging_function():
    hang_here()

process = multiprocessing.Process(target=hanging_function)
process.daemon = True
process.start()

process.join(TIMEOUT)
if process.is_alive():
    print("Function is hanging!")
    process.terminate()
    print("Kidding, just terminated!")

pebble 库就是建立在这一原则之上的。允许轻松分离有问题的代码并处理故障和灾难。

使用进程的缺点是它们比其他两种方法要重一些。此外,由于进程之间的内存是隔离的,共享数据会稍微复杂一些。