python如何将导入的模块封装成多线程的方法?
How to encapsulate an imported module into a method for multithreading in python?
我是 python 的新手,在使用导入库的内部函数时遇到并发问题。问题是我的代码计算了不同种类的变量,在最后一个过程中它们被保存到不同的文件中。但是我在读写时遇到了同样的问题
这是一个有效的示例代码,因为它是线性的:
import xarray as xr
def read_concurrent_files(self):
files_var_type1 = get_files('type1','20200101','20200127')
files_var_type2 = get_files('type2','20200101','20200127')
files_var_type3 = get_files('type3','20200101','20200127')
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
但是当我对代码进行这些更改以使其并发时,它失败了:
import xarray as xr
from multiprocessing.pool import ThreadPool
def read_concurrent_files(self):
pool = ThreadPool(processes=3)
async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))
files_var_type1 = async_result1.get()
files_var_type2 = async_result2.get()
files_var_type3 = async_result3.get()
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
问题在于 xr.open_mfdataset 调用不是 ThreadSafe(或者我认为是)。
有没有办法将导入库只封装到方法作用域中?
我来自其他语言,在方法中创建实例或使用 ThreadSafe 对象很容易。
提前致谢!!
因为我是 python 的新手,所以我不知道我们可以创建不同类型的线程,所以在我上面的示例中,我使用了可以被 GIL 锁定的线程池(全局解释器锁),所以为了避免它,我们可以使用另一种线程,这里有一个例子:
import os
import concurrent.futures
def get_xarray(self):
tasks = []
cpu_count = os.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
for i in range(0, len(self.files)):
tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))
results = []
for result in tasks:
results.append(result.result())
era_raw = xr.merge(results, compat='override')
return era_raw.persist().load()
def get_xarray_by_file(self, files):
era_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return era_raw.persist().load()
在这种情况下,我们使用 ProcessPoolExecutor:
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only pickable objects can be executed and returned.
现在我们可以并行读取 grib2 文件,或者从数据帧中实时并行创建 nc 或 csv 文件。
我是 python 的新手,在使用导入库的内部函数时遇到并发问题。问题是我的代码计算了不同种类的变量,在最后一个过程中它们被保存到不同的文件中。但是我在读写时遇到了同样的问题
这是一个有效的示例代码,因为它是线性的:
import xarray as xr
def read_concurrent_files(self):
files_var_type1 = get_files('type1','20200101','20200127')
files_var_type2 = get_files('type2','20200101','20200127')
files_var_type3 = get_files('type3','20200101','20200127')
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
但是当我对代码进行这些更改以使其并发时,它失败了:
import xarray as xr
from multiprocessing.pool import ThreadPool
def read_concurrent_files(self):
pool = ThreadPool(processes=3)
async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))
files_var_type1 = async_result1.get()
files_var_type2 = async_result2.get()
files_var_type3 = async_result3.get()
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
问题在于 xr.open_mfdataset 调用不是 ThreadSafe(或者我认为是)。
有没有办法将导入库只封装到方法作用域中?
我来自其他语言,在方法中创建实例或使用 ThreadSafe 对象很容易。
提前致谢!!
因为我是 python 的新手,所以我不知道我们可以创建不同类型的线程,所以在我上面的示例中,我使用了可以被 GIL 锁定的线程池(全局解释器锁),所以为了避免它,我们可以使用另一种线程,这里有一个例子:
import os
import concurrent.futures
def get_xarray(self):
tasks = []
cpu_count = os.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
for i in range(0, len(self.files)):
tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))
results = []
for result in tasks:
results.append(result.result())
era_raw = xr.merge(results, compat='override')
return era_raw.persist().load()
def get_xarray_by_file(self, files):
era_raw = xr.open_mfdataset(files , engine='cfgrib', \
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return era_raw.persist().load()
在这种情况下,我们使用 ProcessPoolExecutor:
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only pickable objects can be executed and returned.
现在我们可以并行读取 grib2 文件,或者从数据帧中实时并行创建 nc 或 csv 文件。