python如何将导入的模块封装成多线程的方法?

How to encapsulate an imported module into a method for multithreading in python?

我是 python 的新手,在使用导入库的内部函数时遇到并发问题。问题是我的代码计算了不同种类的变量,在最后一个过程中它们被保存到不同的文件中。但是我在读写时遇到了同样的问题

这是一个有效的示例代码,因为它是线性的:

import xarray as xr

def read_concurrent_files(self):

    files_var_type1 = get_files('type1','20200101','20200127')
    files_var_type2 = get_files('type2','20200101','20200127')
    files_var_type3 = get_files('type3','20200101','20200127')

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', \
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw

但是当我对代码进行这些更改以使其并发时,它失败了:

import xarray as xr
from multiprocessing.pool import ThreadPool

def read_concurrent_files(self):

    pool = ThreadPool(processes=3)

    async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
    async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
    async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))

    files_var_type1 = async_result1.get()
    files_var_type2 = async_result2.get()
    files_var_type3 = async_result3.get()

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', \
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw

问题在于 xr.open_mfdataset 调用不是 ThreadSafe(或者我认为是)。

有没有办法将导入库只封装到方法作用域中?

我来自其他语言,在方法中创建实例或使用 ThreadSafe 对象很容易。

提前致谢!!

因为我是 python 的新手,所以我不知道我们可以创建不同类型的线程,所以在我上面的示例中,我使用了可以被 GIL 锁定的线程池(全局解释器锁),所以为了避免它,我们可以使用另一种线程,这里有一个例子:

import os
import concurrent.futures

def get_xarray(self):
    tasks = []
    cpu_count = os.cpu_count()
    with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
        for i in range(0, len(self.files)):
            tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))

    results = []
    for result in tasks:
        results.append(result.result())
    era_raw = xr.merge(results, compat='override')

    return era_raw.persist().load()

def get_xarray_by_file(self, files):
    era_raw = xr.open_mfdataset(files , engine='cfgrib', \
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
    return era_raw.persist().load()

在这种情况下,我们使用 ProcessPoolExecutor:

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only pickable objects can be executed and returned.

现在我们可以并行读取 grib2 文件,或者从数据帧中实时并行创建 nc 或 csv 文件。