当迭代次数足够多时,Dask 多处理失败,并出现令人尴尬的并行循环,包括调用 MongoDB
Dask multiprocessing fails with embarrassingly parallel for loop including call to MongoDB when number of iterations is high enough
我正在尝试 运行 使用 Dask 多处理在 Python for 循环中进行一种并行模拟。并行化在迭代次数相当少时工作正常,但在迭代次数增加时失败。该问题出现在 Win7(4 核,10 Gb RAM)、Win10(8 核,8 Gb RAM)和 Azure VM 运行ning Windows Server 2016(16 核,32 Gb RAM)上。最慢的,Win7,可以在失败之前经历大多数迭代。可以通过在流程中包含的每个函数结束时添加足够长的休眠时间来缓解此问题,但所需的休眠时间会导致性能非常低,类似于 运行ning sequentially。
我希望有人能在这里帮助我。在此先感谢您的评论和回答!
以下简单代码包含 for 循环的某些阶段并重复错误。
import json
import pandas as pd
from pymongo import MongoClient
# Create random DataFrame
df = pd.DataFrame(np.random.randint(0,100,size=(100,11)), columns=list('ABCDEFGHIJK'))
# Save to Mongo
client = MongoClient()
db = client.errordemo
res = db.errordemo.insert_many(json.loads(df.to_json(orient='records')))
db.client.close()
class ToBeRunParallel:
def __init__(self):
pass
def functionToBeRunParallel(self, i):
# Read data from mongo
with MongoClient() as client:
db = client.errordemo
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
# Randomize data
dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)
# Sum rows
dataSum = dataRand.sum(axis=1)
# Select randomly one of the resulting values and return
return dataSum.sample().values[0]
在控制台或 Jupyter 中调用函数 functionToBeRunParallel(均失败)。 'errordemo' 是包含 class ToBeRunParallel 的本地模块。 运行在 Azure VM 上运行时,代码在 500 次循环中成功,在 5,000 次循环中失败。
import errordemo
from dask import delayed, compute, multiprocessing
# Determine how many times to loop
rng = range(15000)
# Define empty result lists
resList = []
# Create instance
err = errordemo.ToBeRunParallel()
# Loop in parallel using Dask
for i in rng:
sampleValue = delayed(err.functionToBeRunParallel)(i)
resList.append(sampleValue)
# Compute in parallel
result = compute(*resList, get=multiprocessing.get)
Jupyter中的错误堆栈如下
---------------------------------------------------------------------------
AutoReconnect Traceback (most recent call last)
<ipython-input-3-9f535dd4c621> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', '# Determine how many times to loop\nrng = range(50000)\n\n# Define empty result lists\nresList = []\n\n# Create instance\nerr = errordemo.ToBeRunParallel()\n\n# Loop in parallel using Dask\nfor i in rng:\n sampleValue = delayed(err.functionToBeRunParallel)(i)\n resList.append(sampleValue)\n \n# Compute in parallel \nresult = compute(*resList, get=dask.multiprocessing.get)')
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
2113 magic_arg_s = self.var_expand(line, stack_depth)
2114 with self.builtin_trap:
-> 2115 result = fn(magic_arg_s, cell)
2116 return result
2117
<decorator-gen-60> in time(self, line, cell, local_ns)
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
186 # but it's overkill for just that one bit of state.
187 def magic_deco(arg):
--> 188 call = lambda f, *a, **k: f(*a, **k)
189
190 if callable(arg):
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
1178 else:
1179 st = clock2()
-> 1180 exec(code, glob, local_ns)
1181 end = clock2()
1182 out = None
<timed exec> in <module>()
C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
200 dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
201 keys = [var._keys() for var in variables]
--> 202 results = get(dsk, keys, **kwargs)
203
204 results_iter = iter(results)
C:\ProgramData\Anaconda3\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
85 result = get_async(pool.apply_async, len(pool._pool), dsk3, keys,
86 get_id=_process_get_id,
---> 87 dumps=dumps, loads=loads, **kwargs)
88 finally:
89 if cleanup:
C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
498 _execute_task(task, data) # Re-execute locally
499 else:
--> 500 raise(remote_exception(res, tb))
501 state['cache'][key] = res
502 finish_task(dsk, key, state, results, keyorder.get)
AutoReconnect: localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Traceback
---------
File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 266, in execute_task
result = _execute_task(task, data)
File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 247, in _execute_task
return func(*args2)
File "C:\Git_repository\footie\Pipeline\errordemo.py", line 20, in functionToBeRunParallel
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\frame.py", line 981, in from_records
first_row = next(data)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1090, in next
if len(self.__data) or self._refresh():
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1012, in _refresh
self.__read_concern))
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 850, in __send_message
**kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 844, in _send_message_with_response
exhaust)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 855, in _reset_on_error
return func(*args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 99, in send_message_with_response
with self.get_socket(all_credentials, exhaust) as sock_info:
File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
return next(self.gen)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 163, in get_socket
with self.pool.get_socket(all_credentials, checkout) as sock_info:
File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
return next(self.gen)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 582, in get_socket
sock_info = self._get_socket_no_auth()
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 618, in _get_socket_no_auth
sock_info, from_pool = self.connect(), False
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 555, in connect
_raise_connection_failure(self.address, error)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 65, in _raise_connection_failure
raise AutoReconnect(msg)
更新:
在 this post 之后,我创建了一个装饰器来捕获 AutoReconnect 异常,如下所示。与 MongoClient 的参数一起循环工作,但它仍然很慢,是它应该花费的时间的两倍。 (在 Azure VM 上计时):
500次迭代:3.74s
50,000 次迭代:12 分钟 12 秒
def safe_mongocall(call):
def _safe_mongocall(*args, **kwargs):
for i in range(5):
try:
return call(*args, **kwargs)
except errors.AutoReconnect:
sleep(random.random() / 100)
print('Error: Failed operation!')
return _safe_mongocall
@safe_mongocall
def functionToBeRunParallel(self, i):
# Read data from mongo
with MongoClient(connect=False, maxPoolSize=None, maxIdleTimeMS=100) as client:
db = client.errordemo
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
# Randomize data
dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)
# Sum rows
dataSum = dataRand.sum(axis=1)
# Select randomly one of the resulting values and return
return dataSum.sample().values[0]
实际问题是 TCP/IP 端口耗尽,因此解决方案是避免耗尽。在 article by Microsoft 之后,我将以下注册表项和值添加到 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters:
最大用户端口:65534
TcpTimedWaitDelay: 30
我正在尝试 运行 使用 Dask 多处理在 Python for 循环中进行一种并行模拟。并行化在迭代次数相当少时工作正常,但在迭代次数增加时失败。该问题出现在 Win7(4 核,10 Gb RAM)、Win10(8 核,8 Gb RAM)和 Azure VM 运行ning Windows Server 2016(16 核,32 Gb RAM)上。最慢的,Win7,可以在失败之前经历大多数迭代。可以通过在流程中包含的每个函数结束时添加足够长的休眠时间来缓解此问题,但所需的休眠时间会导致性能非常低,类似于 运行ning sequentially。
我希望有人能在这里帮助我。在此先感谢您的评论和回答!
以下简单代码包含 for 循环的某些阶段并重复错误。
import json
import pandas as pd
from pymongo import MongoClient
# Create random DataFrame
df = pd.DataFrame(np.random.randint(0,100,size=(100,11)), columns=list('ABCDEFGHIJK'))
# Save to Mongo
client = MongoClient()
db = client.errordemo
res = db.errordemo.insert_many(json.loads(df.to_json(orient='records')))
db.client.close()
class ToBeRunParallel:
def __init__(self):
pass
def functionToBeRunParallel(self, i):
# Read data from mongo
with MongoClient() as client:
db = client.errordemo
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
# Randomize data
dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)
# Sum rows
dataSum = dataRand.sum(axis=1)
# Select randomly one of the resulting values and return
return dataSum.sample().values[0]
在控制台或 Jupyter 中调用函数 functionToBeRunParallel(均失败)。 'errordemo' 是包含 class ToBeRunParallel 的本地模块。 运行在 Azure VM 上运行时,代码在 500 次循环中成功,在 5,000 次循环中失败。
import errordemo
from dask import delayed, compute, multiprocessing
# Determine how many times to loop
rng = range(15000)
# Define empty result lists
resList = []
# Create instance
err = errordemo.ToBeRunParallel()
# Loop in parallel using Dask
for i in rng:
sampleValue = delayed(err.functionToBeRunParallel)(i)
resList.append(sampleValue)
# Compute in parallel
result = compute(*resList, get=multiprocessing.get)
Jupyter中的错误堆栈如下
---------------------------------------------------------------------------
AutoReconnect Traceback (most recent call last)
<ipython-input-3-9f535dd4c621> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', '# Determine how many times to loop\nrng = range(50000)\n\n# Define empty result lists\nresList = []\n\n# Create instance\nerr = errordemo.ToBeRunParallel()\n\n# Loop in parallel using Dask\nfor i in rng:\n sampleValue = delayed(err.functionToBeRunParallel)(i)\n resList.append(sampleValue)\n \n# Compute in parallel \nresult = compute(*resList, get=dask.multiprocessing.get)')
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
2113 magic_arg_s = self.var_expand(line, stack_depth)
2114 with self.builtin_trap:
-> 2115 result = fn(magic_arg_s, cell)
2116 return result
2117
<decorator-gen-60> in time(self, line, cell, local_ns)
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
186 # but it's overkill for just that one bit of state.
187 def magic_deco(arg):
--> 188 call = lambda f, *a, **k: f(*a, **k)
189
190 if callable(arg):
C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
1178 else:
1179 st = clock2()
-> 1180 exec(code, glob, local_ns)
1181 end = clock2()
1182 out = None
<timed exec> in <module>()
C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
200 dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
201 keys = [var._keys() for var in variables]
--> 202 results = get(dsk, keys, **kwargs)
203
204 results_iter = iter(results)
C:\ProgramData\Anaconda3\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
85 result = get_async(pool.apply_async, len(pool._pool), dsk3, keys,
86 get_id=_process_get_id,
---> 87 dumps=dumps, loads=loads, **kwargs)
88 finally:
89 if cleanup:
C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
498 _execute_task(task, data) # Re-execute locally
499 else:
--> 500 raise(remote_exception(res, tb))
501 state['cache'][key] = res
502 finish_task(dsk, key, state, results, keyorder.get)
AutoReconnect: localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Traceback
---------
File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 266, in execute_task
result = _execute_task(task, data)
File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 247, in _execute_task
return func(*args2)
File "C:\Git_repository\footie\Pipeline\errordemo.py", line 20, in functionToBeRunParallel
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\frame.py", line 981, in from_records
first_row = next(data)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1090, in next
if len(self.__data) or self._refresh():
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1012, in _refresh
self.__read_concern))
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 850, in __send_message
**kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 844, in _send_message_with_response
exhaust)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 855, in _reset_on_error
return func(*args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 99, in send_message_with_response
with self.get_socket(all_credentials, exhaust) as sock_info:
File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
return next(self.gen)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 163, in get_socket
with self.pool.get_socket(all_credentials, checkout) as sock_info:
File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
return next(self.gen)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 582, in get_socket
sock_info = self._get_socket_no_auth()
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 618, in _get_socket_no_auth
sock_info, from_pool = self.connect(), False
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 555, in connect
_raise_connection_failure(self.address, error)
File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 65, in _raise_connection_failure
raise AutoReconnect(msg)
更新:
在 this post 之后,我创建了一个装饰器来捕获 AutoReconnect 异常,如下所示。与 MongoClient 的参数一起循环工作,但它仍然很慢,是它应该花费的时间的两倍。 (在 Azure VM 上计时):
500次迭代:3.74s
50,000 次迭代:12 分钟 12 秒
def safe_mongocall(call):
def _safe_mongocall(*args, **kwargs):
for i in range(5):
try:
return call(*args, **kwargs)
except errors.AutoReconnect:
sleep(random.random() / 100)
print('Error: Failed operation!')
return _safe_mongocall
@safe_mongocall
def functionToBeRunParallel(self, i):
# Read data from mongo
with MongoClient(connect=False, maxPoolSize=None, maxIdleTimeMS=100) as client:
db = client.errordemo
dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
# Randomize data
dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)
# Sum rows
dataSum = dataRand.sum(axis=1)
# Select randomly one of the resulting values and return
return dataSum.sample().values[0]
实际问题是 TCP/IP 端口耗尽,因此解决方案是避免耗尽。在 article by Microsoft 之后,我将以下注册表项和值添加到 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters:
最大用户端口:65534
TcpTimedWaitDelay: 30