TypeError: cannot pickle '_thread.lock' object Dask compute
TypeError: cannot pickle '_thread.lock' object Dask compute
我正在尝试使用 dask 进行多处理。我有一个函数必须 运行 用于 10000 个文件并将生成文件作为输出。函数将 S3 存储桶中的文件作为输入,并处理 S3 中具有相似日期和时间的另一个文件。我正在 JupyterLab
中做所有事情
所以这是我的函数:
def get_temp(file, name):
d=[name[0:4],name[4:6],name[6:8],name[9:11],name[11:13]]
f_zip = gzip.decompress(file)
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
fs = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da), freq='D')
# period.dayofyear
dy=period.dayofyear
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for now
dat = xr.open_dataset(f_zip)
dd=dat[['recNum','trackLat','trackLon', 'temp']]
dd=dd.to_dataframe()
dd = dd.dropna()
dd['num'] = np.arange(len(dd))
l=dd.where((dd.trackLat>-50.0) & (dd.trackLat<50.0) & (dd.trackLon>-110.0) & (dd.trackLon<10.0))
l = l.dropna()
l.reset_index()
dy="{0:0=3d}".format(dy)
#opening goes data from S3
F=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C07'+'*')[int(mn)//15]))
#Converting Lat lon to radiance
req=F['goes_imager_projection'].semi_major_axis
oneovf=F['goes_imager_projection'].inverse_flattening
rpol=F['goes_imager_projection'].semi_minor_axis
e = 0.0818191910435
sat_h=F['goes_imager_projection'].perspective_point_height
H=req+sat_h
gc=np.deg2rad(F['goes_imager_projection'].longitude_of_projection_origin)
phi=np.deg2rad(l.trackLat.values)
gam=np.deg2rad(l.trackLon.values)
phic=np.arctan((rpol**2/req**2)*np.tan(phi))
rc=rpol/np.sqrt((1-e**2*np.cos(phic)**2))
sx=H-rc*np.cos(phic)*np.cos(gam-gc)
sy=-rc*np.cos(phic)*np.sin(gam-gc)
sz=rc*np.sin(phic)
yy=np.arctan(sz/sx)
xx=np.arcsin(-sy/(np.sqrt(sx**2+sy**2+sz**2)))
for i in range(len(xx)):
for c in range(len(ch):
ch="{0:0=2d}".format(cc[c])
F1=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[0]))
F2=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))
G1 = F1.where((F1.x >= (xx[i]-0.005)) & (F1.x <= (xx[i]+0.005)) & (F1.y >= (yy[i]-0.005)) & (F1.y <= (yy[i]+0.005)), drop=True)
G2 = F2.where((F2.x >= (xx[i]-0.005)) & (F2.x <= (xx[i]+0.005)) & (F2.y >= (yy[i]-0.005)) & (F2.y <= (yy[i]+0.005)), drop=True)
G = xr.concat([G1, G2], dim = 'time')
G = G.assign_coords(channel=(ch))
if c == 0:
T = G
else:
T = xr.concat([T, G], dim = 'channel')
T = T.assign_coords(temp=(str(l['temp'][i])))
print(l.iloc[i]['num'])
path = name+'_'+str(int(l.iloc[i]['num']))+'.nc'
T.to_netcdf(path)
#zipping the file
with zipfile.ZipFile(name+'_'+str(int(l.iloc[i]['num']))+'.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
zf.write(path, arcname=str(name+'_'+str(int(l.iloc[i]['num']))+'.nc'))
# Storing it to S3
s3.Bucket(BUCKET).upload_file(path[:-3]+'.zip', "Output/" + path[:-3]+'.zip')
这是我从 S3 调用数据:
s3 = boto3.resource('s3')
s3client = boto3.client(
's3',
region_name='us-east-1'
)
bucketname = s3.Bucket('temp')
filedata = []
keys = []
names = []
for my_bucket_object in bucketname.objects.all():
keys.append(my_bucket_object.key)
for i in range(1, 21):
fileobj = s3client.get_object(
Bucket='temp',
Key=(keys[i]))
filedata.append(fileobj['Body'].read())
names.append(keys[i][10:-3])
最初,我只是想 运行 20 个文件用于测试目的。
这是我正在创建的 dask 延迟和计算函数:
temp_files = []
for i in range(20):
s3_ds = dask.delayed(get_temp)(filedata[i], names[i])
temp_files.append(s3_ds)
temp_files = dask.compute(*temp_files)
这是完整的错误日志:
distributed.protocol.pickle - INFO - Failed to serialize <function get_temp at 0x7f20a9cb8550>. Exception: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3319 with _cache_lock:
-> 3320 result = cache_dumps[func]
3321 except KeyError:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self, key)
1572 def __getitem__(self, key):
-> 1573 value = super().__getitem__(key)
1574 self.data.move_to_end(key)
/srv/conda/envs/notebook/lib/python3.8/collections/__init__.py in __getitem__(self, key)
1009 return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)
1011 def __setitem__(self, key, item): self.data[key] = item
KeyError: <function get_temp at 0x7f20a9cb8550>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
52 buffers.clear()
---> 53 result = cloudpickle.dumps(x, **dump_kwargs)
54 elif not _always_use_pickle_for(x) and b"__main__" in result:
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
TypeError: cannot pickle '_thread.lock' object
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-77-fa46004f5919> in <module>
----> 1 temp_files = dask.compute(*temp_files)
/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2703 Client.compute: Compute asynchronous collections
2704 """
-> 2705 futures = self._graph_to_futures(
2706 dsk,
2707 keys=set(flatten([keys])),
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2639 {
2640 "op": "update-graph",
-> 2641 "tasks": valmap(dumps_task, dsk),
2642 "dependencies": dependencies,
2643 "keys": list(map(tokey, keys)),
/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex, task[1:])):
-> 3358 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3320 result = cache_dumps[func]
3321 except KeyError:
-> 3322 result = pickle.dumps(func, protocol=4)
3323 if len(result) < 100000:
3324 with _cache_lock:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
TypeError: cannot pickle '_thread.lock' object
有人可以在这里帮助我并告诉我我做错了什么。除了Dask还有其他并行处理方式吗?
所以我发现只有当我将文件上传到 S3 存储桶时才会抛出该错误,否则它工作正常。但是如果我不在 S3 中保存文件,我就无法确定文件的存储位置。当我 运行 忙于 dask 时,它会将文件保存在我找不到的地方。我在 Jupyterlab 中 运行 编译我的代码,但在任何目录中都没有保存任何内容。
我花了一些时间来解析你的代码。
在 large 函数中,您使用 s3fs
与您的云存储进行交互,这与 xarray 配合得很好。
但是,在您的主要代码中,您使用 boto3
来列出和打开 S3 文件。这些文件保留对客户端对象的引用,该对象维护一个连接池。那就是不能腌制的东西。
s3fs
旨在与 Dask 一起使用,并确保文件系统实例和 OpenFile 对象的可选择性。由于您已经在其中一部分中使用了它,因此我建议始终使用 s3fs
(但我当然有偏见,因为我是主要作者)。
或者,您可以只传递文件名(作为字符串),直到在 worker 函数中才打开任何内容。这将是“最佳实践”——您应该在工作任务中加载数据,而不是在客户端加载并传递数据。
我正在尝试使用 dask 进行多处理。我有一个函数必须 运行 用于 10000 个文件并将生成文件作为输出。函数将 S3 存储桶中的文件作为输入,并处理 S3 中具有相似日期和时间的另一个文件。我正在 JupyterLab
中做所有事情所以这是我的函数:
def get_temp(file, name):
d=[name[0:4],name[4:6],name[6:8],name[9:11],name[11:13]]
f_zip = gzip.decompress(file)
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
fs = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da), freq='D')
# period.dayofyear
dy=period.dayofyear
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for now
dat = xr.open_dataset(f_zip)
dd=dat[['recNum','trackLat','trackLon', 'temp']]
dd=dd.to_dataframe()
dd = dd.dropna()
dd['num'] = np.arange(len(dd))
l=dd.where((dd.trackLat>-50.0) & (dd.trackLat<50.0) & (dd.trackLon>-110.0) & (dd.trackLon<10.0))
l = l.dropna()
l.reset_index()
dy="{0:0=3d}".format(dy)
#opening goes data from S3
F=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C07'+'*')[int(mn)//15]))
#Converting Lat lon to radiance
req=F['goes_imager_projection'].semi_major_axis
oneovf=F['goes_imager_projection'].inverse_flattening
rpol=F['goes_imager_projection'].semi_minor_axis
e = 0.0818191910435
sat_h=F['goes_imager_projection'].perspective_point_height
H=req+sat_h
gc=np.deg2rad(F['goes_imager_projection'].longitude_of_projection_origin)
phi=np.deg2rad(l.trackLat.values)
gam=np.deg2rad(l.trackLon.values)
phic=np.arctan((rpol**2/req**2)*np.tan(phi))
rc=rpol/np.sqrt((1-e**2*np.cos(phic)**2))
sx=H-rc*np.cos(phic)*np.cos(gam-gc)
sy=-rc*np.cos(phic)*np.sin(gam-gc)
sz=rc*np.sin(phic)
yy=np.arctan(sz/sx)
xx=np.arcsin(-sy/(np.sqrt(sx**2+sy**2+sz**2)))
for i in range(len(xx)):
for c in range(len(ch):
ch="{0:0=2d}".format(cc[c])
F1=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[0]))
F2=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))
G1 = F1.where((F1.x >= (xx[i]-0.005)) & (F1.x <= (xx[i]+0.005)) & (F1.y >= (yy[i]-0.005)) & (F1.y <= (yy[i]+0.005)), drop=True)
G2 = F2.where((F2.x >= (xx[i]-0.005)) & (F2.x <= (xx[i]+0.005)) & (F2.y >= (yy[i]-0.005)) & (F2.y <= (yy[i]+0.005)), drop=True)
G = xr.concat([G1, G2], dim = 'time')
G = G.assign_coords(channel=(ch))
if c == 0:
T = G
else:
T = xr.concat([T, G], dim = 'channel')
T = T.assign_coords(temp=(str(l['temp'][i])))
print(l.iloc[i]['num'])
path = name+'_'+str(int(l.iloc[i]['num']))+'.nc'
T.to_netcdf(path)
#zipping the file
with zipfile.ZipFile(name+'_'+str(int(l.iloc[i]['num']))+'.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
zf.write(path, arcname=str(name+'_'+str(int(l.iloc[i]['num']))+'.nc'))
# Storing it to S3
s3.Bucket(BUCKET).upload_file(path[:-3]+'.zip', "Output/" + path[:-3]+'.zip')
这是我从 S3 调用数据:
s3 = boto3.resource('s3')
s3client = boto3.client(
's3',
region_name='us-east-1'
)
bucketname = s3.Bucket('temp')
filedata = []
keys = []
names = []
for my_bucket_object in bucketname.objects.all():
keys.append(my_bucket_object.key)
for i in range(1, 21):
fileobj = s3client.get_object(
Bucket='temp',
Key=(keys[i]))
filedata.append(fileobj['Body'].read())
names.append(keys[i][10:-3])
最初,我只是想 运行 20 个文件用于测试目的。
这是我正在创建的 dask 延迟和计算函数:
temp_files = []
for i in range(20):
s3_ds = dask.delayed(get_temp)(filedata[i], names[i])
temp_files.append(s3_ds)
temp_files = dask.compute(*temp_files)
这是完整的错误日志:
distributed.protocol.pickle - INFO - Failed to serialize <function get_temp at 0x7f20a9cb8550>. Exception: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3319 with _cache_lock:
-> 3320 result = cache_dumps[func]
3321 except KeyError:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self, key)
1572 def __getitem__(self, key):
-> 1573 value = super().__getitem__(key)
1574 self.data.move_to_end(key)
/srv/conda/envs/notebook/lib/python3.8/collections/__init__.py in __getitem__(self, key)
1009 return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)
1011 def __setitem__(self, key, item): self.data[key] = item
KeyError: <function get_temp at 0x7f20a9cb8550>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
52 buffers.clear()
---> 53 result = cloudpickle.dumps(x, **dump_kwargs)
54 elif not _always_use_pickle_for(x) and b"__main__" in result:
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
TypeError: cannot pickle '_thread.lock' object
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-77-fa46004f5919> in <module>
----> 1 temp_files = dask.compute(*temp_files)
/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2703 Client.compute: Compute asynchronous collections
2704 """
-> 2705 futures = self._graph_to_futures(
2706 dsk,
2707 keys=set(flatten([keys])),
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2639 {
2640 "op": "update-graph",
-> 2641 "tasks": valmap(dumps_task, dsk),
2642 "dependencies": dependencies,
2643 "keys": list(map(tokey, keys)),
/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex, task[1:])):
-> 3358 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3320 result = cache_dumps[func]
3321 except KeyError:
-> 3322 result = pickle.dumps(func, protocol=4)
3323 if len(result) < 100000:
3324 with _cache_lock:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
TypeError: cannot pickle '_thread.lock' object
有人可以在这里帮助我并告诉我我做错了什么。除了Dask还有其他并行处理方式吗?
所以我发现只有当我将文件上传到 S3 存储桶时才会抛出该错误,否则它工作正常。但是如果我不在 S3 中保存文件,我就无法确定文件的存储位置。当我 运行 忙于 dask 时,它会将文件保存在我找不到的地方。我在 Jupyterlab 中 运行 编译我的代码,但在任何目录中都没有保存任何内容。
我花了一些时间来解析你的代码。
在 large 函数中,您使用 s3fs
与您的云存储进行交互,这与 xarray 配合得很好。
但是,在您的主要代码中,您使用 boto3
来列出和打开 S3 文件。这些文件保留对客户端对象的引用,该对象维护一个连接池。那就是不能腌制的东西。
s3fs
旨在与 Dask 一起使用,并确保文件系统实例和 OpenFile 对象的可选择性。由于您已经在其中一部分中使用了它,因此我建议始终使用 s3fs
(但我当然有偏见,因为我是主要作者)。
或者,您可以只传递文件名(作为字符串),直到在 worker 函数中才打开任何内容。这将是“最佳实践”——您应该在工作任务中加载数据,而不是在客户端加载并传递数据。