如何在 Python 中传递我的 concurrent.futures.ProcessPoolExecutor 中的 "lock"?

How to pass the "lock" in my concurrent.futures.ProcessPoolExecutor in Python?

我正在 运行 在真实 Android 智能手机上使用 Python 3.7 和 Appium 1.15.1 进行并行测试。

我在每部智能手机上使用 concurrent.futures.ProcessPoolExecutor 到 运行 每次测试。

我正在将智能手机的 uid 列表传递给我的地图功能。通过这种方式,我的方法 'run_smartphone()'(开始测试)获取智能手机的 uid 并确定它必须在哪个智能手机上 运行 测试。

我的脚本工作正常,没有任何问题。但是 我想添加一个 "lock" 因为 'run_smartphone()' 在 sqlite3 数据库上做了一些 I/O。如果我错了,请纠正我,但是在这个 sqlite3 数据库上 "lock" I/O 操作是一个好习惯吗?

这是我的原始代码:

def run_smartphone(p_udid):
    #do the stuff

list_smartphones_connected = [41492968379078, 53519716736397]
with concurrent.futures.ProcessPoolExecutor() as executor:
    try:
        multiprocesses = executor.map(mymodules.run_smartphone, list_smartphones_connected)

    except ValueError:
        print(("Error multiprocesses"))

所以我尝试将 pass "lock" 添加到我的方法 'run_smartphone()' 中。这是我写的:

m = multiprocessing.Manager()
lock = m.Lock()
list_arguments_smartphones = []

list_smartphones_connected = [41492968379078, 53519716736397]
for smartphone_connected in list_smartphones_connected:        
    list_arguments_smartphones.append([smartphone_connected, lock])

with concurrent.futures.ProcessPoolExecutor() as executor:
    try:
        multiprocesses = executor.map(mymodules.run_smartphone, list_arguments_smartphones)

    except ValueError:
        print(("Error multiprocesses"))

但它不起作用,我也没有收到任何异常。 Pycharm 停止脚本:

Process finished with exit code 0

我不知道是什么阻止了脚本。

所以我开始通过使用以下行为 1 部智能手机执行脚本来进行调查:

 multiprocesses = executor.map(mymodules.run_smartphone, [41492968379078,lock])

它给出相同的结果 => 脚本停止,没有自动启动,我没有看到任何异常引发(进程完成,退出代码为 0)。

因为我想知道问题到底出在哪里,所以我 运行 脚本 'trace'。

py -m trace --trace  myscript.py

但我什么都不懂,我没有看到任何错误...您可以在我上传到 GitHub 的文本文件上看到此 'trace' 命令的输出:

https://github.com/gauthierbuttez/public/blob/master/trace-log.txt

有人知道如何将 "lock" 传递给我的 concurrent.futures.ProcessPoolExecutor() 吗? 这样做是个好主意吗?

谢谢。

来自 docs for map():

If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

换句话说,您可能必须实际使用 run_smartphone 的 return 值:

m = multiprocessing.Manager()
lock = m.Lock()
list_arguments_smartphones = []

list_smartphones_connected = [41492968379078, 53519716736397]
for smartphone_connected in list_smartphones_connected:        
    list_arguments_smartphones.append([smartphone_connected, lock])

with concurrent.futures.ProcessPoolExecutor() as executor:
    try:
        multiprocesses = executor.map(mymodules.run_smartphone, list_arguments_smartphones)

        for function_return_value in multiprocesses:
            print(function_return_value)
            # or do something with the value, like insert into a db

    except ValueError:
        print(("Error multiprocesses"))

但是,如果您甚至无法将锁传递给您的函数,并且您没有费心阅读文档,那么我建议您重组代码,以便 run_smartphone 仅从数据库中读取(应该't require a lock) and you write to the db in the for loop (doesn't require a lock).否则,你会陷入僵局的泥潭。

希望对您有所帮助...

    m = multiprocessing.Manager()
    lock = m.Lock()

    def run_smartphone(p_udid, lock): 
        # further code

    list_smartphones_connected = [41492968379078, 53519716736397] 
    with concurrent.futures.ProcessPoolExecutor() as executor: 
        try: 
            multiprocesses = executor.map(run_smartphone, list_smartphones_connected, [lock]*len(list_smartphones_connected)) 
            for function_return_value in multiprocesses:
                print(function_return_value)

        except ValueError: 
            print(("Error multiprocesses"))