在并发进程池期间从磁盘读取 pandas
Reading pandas from disk during concurrent process pool
我已经编写了一个 cli 工具来生成模拟,我希望为我拥有的 ~200 个数据的每个切割生成大约 10k(~10 分钟)。我有一些函数可以在 for 循环中很好地执行此操作,但是当我将其转换为 concurrent.futures.ProcessPoolExecutor()
时,我意识到多个进程无法读取同一个 pandas 数据帧。
这是我能想到的最小的例子:
import concurrent.futures
import pandas as pd
def example():
# This is a static table with basic information like distributions
df = pd.read_parquet("batch/data/mappings.pq")
# Then there's a bunch of etl, even reading in a few other static tables
return sum(df.shape)
def main():
results = []
with concurrent.futures.ProcessPoolExecutor() as pool:
futr_results = [pool.submit(example) for _ in range(100)]
done_results = concurrent.futures.as_completed(futr_results)
for _ in futr_results:
results.append(next(done_results).result())
return results
if __name__ == "__main__":
print(main())
错误:
<jemalloc>: background thread creation failed (11)
terminate called after throwing an instance of 'std::system_error'
what(): Resource temporarily unavailable
Traceback (most recent call last):
File "batch/testing.py", line 19, in <module>
main()
File "batch/testing.py", line 14, in main
results.append(next(done_results).result())
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
我希望有一种快速而肮脏的方式来阅读这些内容(我猜没有参考?),否则看起来我需要先创建所有参数而不是即时获取它们。
我会尝试的三件事:
Pandas 具有 an option 用于在读取镶木地板文件时使用 PyArrow 或 FastParquet。尝试使用不同的 - 这似乎是一个错误。
尝试强制pandas以只读模式打开文件以防止由于文件被锁定而导致的冲突:
pd.read_parquet(open("batch/data/mappings.pq", "rb"))
# Also try "r" instead of "rb", not sure if pandas expects string or binary data
- 尝试将文件加载到 StringIO/BytesIO 缓冲区,然后将其交给 pandas - 这避免了 pandas 本身与文件的任何交互:
import io
# either this (binary)
data = io.BytesIO(open("batch/data/mappings.pq", "rb").read())
# or this (string)
data = io.StringIO(open("batch/data/mappings.pq", "r").read())
pd.read_parquet(data)
我已经编写了一个 cli 工具来生成模拟,我希望为我拥有的 ~200 个数据的每个切割生成大约 10k(~10 分钟)。我有一些函数可以在 for 循环中很好地执行此操作,但是当我将其转换为 concurrent.futures.ProcessPoolExecutor()
时,我意识到多个进程无法读取同一个 pandas 数据帧。
这是我能想到的最小的例子:
import concurrent.futures
import pandas as pd
def example():
# This is a static table with basic information like distributions
df = pd.read_parquet("batch/data/mappings.pq")
# Then there's a bunch of etl, even reading in a few other static tables
return sum(df.shape)
def main():
results = []
with concurrent.futures.ProcessPoolExecutor() as pool:
futr_results = [pool.submit(example) for _ in range(100)]
done_results = concurrent.futures.as_completed(futr_results)
for _ in futr_results:
results.append(next(done_results).result())
return results
if __name__ == "__main__":
print(main())
错误:
<jemalloc>: background thread creation failed (11)
terminate called after throwing an instance of 'std::system_error'
what(): Resource temporarily unavailable
Traceback (most recent call last):
File "batch/testing.py", line 19, in <module>
main()
File "batch/testing.py", line 14, in main
results.append(next(done_results).result())
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
我希望有一种快速而肮脏的方式来阅读这些内容(我猜没有参考?),否则看起来我需要先创建所有参数而不是即时获取它们。
我会尝试的三件事:
Pandas 具有 an option 用于在读取镶木地板文件时使用 PyArrow 或 FastParquet。尝试使用不同的 - 这似乎是一个错误。
尝试强制pandas以只读模式打开文件以防止由于文件被锁定而导致的冲突:
pd.read_parquet(open("batch/data/mappings.pq", "rb"))
# Also try "r" instead of "rb", not sure if pandas expects string or binary data
- 尝试将文件加载到 StringIO/BytesIO 缓冲区,然后将其交给 pandas - 这避免了 pandas 本身与文件的任何交互:
import io
# either this (binary)
data = io.BytesIO(open("batch/data/mappings.pq", "rb").read())
# or this (string)
data = io.StringIO(open("batch/data/mappings.pq", "r").read())
pd.read_parquet(data)