Dask 似乎正在为全局变量共享内存,我认为这是不可能的
Dask seems to be sharing memory for global variable, which I thought was impossible
以下是一些可重现的代码:
- 这是带有全局变量的 Dask:
# method_file.py
import dask
import random
class TestClass():
def from_dataframe(self, pdf):
global data
data = pdf
@staticmethod
def work(elem):
data.iloc[0, 0] = 9999
return len(data) + elem
def do(self):
tasks = [dask.delayed(TestClass.work)(random.randint(1,500)) for x in range(10)]
re = dask.compute(*tasks)
return re
# main_file.py
from method_file import TestClass
import numpy as np
import pandas as pd
if __name__ == '__main__':
ar = np.arange(500000000).reshape(5000000, 100)
pdf = pd.DataFrame(ar)
tc = TestClass()
tc.from_dataframe(pdf)
print(tc.do())
print(pdf.head(3))
python3 main_file.py
这输出:
(5000117, 5000054, 5000304, 5000111, 5000010, 5000264, 5000201, 5000346, 5000486, 5000376)
0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
0 9999 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
1 100 101 102 103 104 105 106 ... 193 194 195 196 197 198 199
2 200 201 202 203 204 205 206 ... 293 294 295 296 297 298 299
[3 rows x 100 columns]
这意味着 work
方法能够读取 data
全局变量。不仅如此,它甚至变异了 pdf
变量!我知道 fork
的多处理也可以通过这种方式读取数据,如下所示。
- 这是使用
fork
启动方法的多处理。
# method_file2.py
from multiprocessing import Pool
import multiprocessing
import random
class TestClass():
def from_dataframe(self, pdf):
global data
data = pdf
@staticmethod
def work(elem):
data.iloc[0, 0] = 9999
return len(data) + elem
def do(self):
multiprocessing.set_start_method('fork')
pool = Pool(6)
procs = [pool.apply_async(TestClass.work, args=(random.randint(1,500), )) for i in range(1, 10)]
re = [proc.get() for proc in procs]
return re
# main_file2.py
from method_file2 import TestClass
import numpy as np
import pandas as pd
if __name__ == '__main__':
ar = np.arange(500000000).reshape(5000000, 100)
pdf = pd.DataFrame(ar)
tc = TestClass()
tc.from_dataframe(pdf)
print(tc.do())
print(pdf.head(3))
python3 main_file2.py
这输出:
[5000456, 5000346, 5000122, 5000120, 5000358, 5000067, 5000375, 5000444, 5000288]
0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
0 0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
1 100 101 102 103 104 105 106 ... 193 194 195 196 197 198 199
2 200 201 202 203 204 205 206 ... 293 294 295 296 297 298 299
[3 rows x 100 columns]
如您所见,它可以读取,因为复制了主进程的状态,但它不能改变对象 (pdf
)。
为简洁起见,我不会在此处包含更多代码,但我也尝试对 pdf
对象进行 cloudpickling 计时,而且我知道事实上 Dask 并未对该数据框进行 pickling。 Dask怎么可能这样跨进程共享内存?
我没有看到您设置任何进程。 Dask 的默认调度程序使用线程池,因此所有任务都能够看到相同的变量。参见 https://docs.dask.org/en/latest/scheduler-overview.html
以下是一些可重现的代码:
- 这是带有全局变量的 Dask:
# method_file.py
import dask
import random
class TestClass():
def from_dataframe(self, pdf):
global data
data = pdf
@staticmethod
def work(elem):
data.iloc[0, 0] = 9999
return len(data) + elem
def do(self):
tasks = [dask.delayed(TestClass.work)(random.randint(1,500)) for x in range(10)]
re = dask.compute(*tasks)
return re
# main_file.py
from method_file import TestClass
import numpy as np
import pandas as pd
if __name__ == '__main__':
ar = np.arange(500000000).reshape(5000000, 100)
pdf = pd.DataFrame(ar)
tc = TestClass()
tc.from_dataframe(pdf)
print(tc.do())
print(pdf.head(3))
python3 main_file.py
这输出:
(5000117, 5000054, 5000304, 5000111, 5000010, 5000264, 5000201, 5000346, 5000486, 5000376)
0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
0 9999 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
1 100 101 102 103 104 105 106 ... 193 194 195 196 197 198 199
2 200 201 202 203 204 205 206 ... 293 294 295 296 297 298 299
[3 rows x 100 columns]
这意味着 work
方法能够读取 data
全局变量。不仅如此,它甚至变异了 pdf
变量!我知道 fork
的多处理也可以通过这种方式读取数据,如下所示。
- 这是使用
fork
启动方法的多处理。
# method_file2.py
from multiprocessing import Pool
import multiprocessing
import random
class TestClass():
def from_dataframe(self, pdf):
global data
data = pdf
@staticmethod
def work(elem):
data.iloc[0, 0] = 9999
return len(data) + elem
def do(self):
multiprocessing.set_start_method('fork')
pool = Pool(6)
procs = [pool.apply_async(TestClass.work, args=(random.randint(1,500), )) for i in range(1, 10)]
re = [proc.get() for proc in procs]
return re
# main_file2.py
from method_file2 import TestClass
import numpy as np
import pandas as pd
if __name__ == '__main__':
ar = np.arange(500000000).reshape(5000000, 100)
pdf = pd.DataFrame(ar)
tc = TestClass()
tc.from_dataframe(pdf)
print(tc.do())
print(pdf.head(3))
python3 main_file2.py
这输出:
[5000456, 5000346, 5000122, 5000120, 5000358, 5000067, 5000375, 5000444, 5000288]
0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
0 0 1 2 3 4 5 6 ... 93 94 95 96 97 98 99
1 100 101 102 103 104 105 106 ... 193 194 195 196 197 198 199
2 200 201 202 203 204 205 206 ... 293 294 295 296 297 298 299
[3 rows x 100 columns]
如您所见,它可以读取,因为复制了主进程的状态,但它不能改变对象 (pdf
)。
为简洁起见,我不会在此处包含更多代码,但我也尝试对 pdf
对象进行 cloudpickling 计时,而且我知道事实上 Dask 并未对该数据框进行 pickling。 Dask怎么可能这样跨进程共享内存?
我没有看到您设置任何进程。 Dask 的默认调度程序使用线程池,因此所有任务都能够看到相同的变量。参见 https://docs.dask.org/en/latest/scheduler-overview.html