joblib 的中间结果
Intermediate results from joblib
我正在尝试学习 joblib
模块以替代 python 中的内置 multiprocessing
模块。我习惯于在可迭代对象上使用 multiprocessing.imap
到 运行 函数并返回结果。在这个最小的工作示例中,我无法弄清楚如何使用 joblib:
import joblib, time
def hello(n):
time.sleep(1)
print "Inside function", n
return n
with joblib.Parallel(n_jobs=1) as MP:
func = joblib.delayed(hello)
for x in MP(func(x) for x in range(3)):
print "Outside function", x
打印:
Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2
我想看看输出:
Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2
或类似的东西,表明可迭代 MP(...)
不等待所有结果完成。对于更长的演示更改 n_jobs=-1
和 range(100)
.
>>> import joblib, time
>>>
>>> def hello(n):
... time.sleep(1)
... print "Inside function", n
... return n
...
>>> with joblib.Parallel(n_jobs=1) as MP:
... func = joblib.delayed(hello)
... res = MP(func(x) for x in range(3)) # This is not an iterator.
...
Inside function 0
Inside function 1
Inside function 2
>>> type(res)
<type 'list'>
你处理的不是发电机。因此,您不应期望它会为您提供中间结果。我在文档中阅读的内容似乎没有提及(或者我没有阅读相关部分)。
欢迎您阅读文档并搜索 "intermediate" 结果主题:
https://pythonhosted.org/joblib/search.html?q=intermediate&check_keywords=yes&area=default
我的理解是每次调用parallel
都是一个barrier,为了得到中间结果,需要分块处理:
>>> import joblib, time
>>>
>>> def hello(n):
... time.sleep(1)
... print "Inside function", n
... return n
...
>>> with joblib.Parallel(n_jobs=1) as MP:
... func = joblib.delayed(hello)
... for chunk in range(3):
... x = MP(func(y) for y in [chunk])
... print "Outside function", x
...
Inside function 0
Outside function [0]
Inside function 1
Outside function [1]
Inside function 2
Outside function [2]
>>>
如果你想获得技术,有一个回调机制,但它专门用于进度报告(BatchCompletionCallBack
),但你需要更多涉及的代码更改。
从 joblib 获得即时结果,例如:
from joblib._parallel_backends import MultiprocessingBackend
class ImmediateResult_Backend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % (result))
# Overload apply_async and set callback=self.callback
def apply_async(self, func, callback=None):
applyResult = super().apply_async(func, self.callback)
return applyResult
joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)
with joblib.Parallel(n_jobs=2) as parallel:
func = parallel(delayed(hello)(y) for y in range(3))
for f in func:
print("Outside function %s" % (f))
输出:
注意:我在def hello(...)
中使用了time.sleep(n * random.randrange(1,5))
,因此processes
变得不同了。
Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2
测试 Python:3.4.2 - joblib:0.11
stovfl 的回答很优雅,但它只适用于第一批派出的。在示例中,它之所以有效,是因为工人们从不挨饿 (n_tasks < 2*n_jobs
)。要使这种方法起作用,还必须调用最初传递给 apply_async
的回调。这是 BatchCompletionCallBack
的实例,它安排下一批要处理的任务。
一种可能的解决方案是将任意回调包装在一个可调用对象中,像这样(在 joblib==0.11,py36 中测试):
from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class ImmediateResultBackend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % result)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)
register_parallel_backend('custom', ImmediateResultBackend)
def hello(n):
time.sleep(1)
print("Inside function", n)
return n
with parallel_backend('custom'):
res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))
输出
Inside function 0
Inside function 1
ImmediateResult function [0]
ImmediateResult function [1]
Inside function 3
Inside function 2
ImmediateResult function [3]
ImmediateResult function [2]
Inside function 4
ImmediateResult function [4]
Inside function 5
ImmediateResult function [5]
我正在尝试学习 joblib
模块以替代 python 中的内置 multiprocessing
模块。我习惯于在可迭代对象上使用 multiprocessing.imap
到 运行 函数并返回结果。在这个最小的工作示例中,我无法弄清楚如何使用 joblib:
import joblib, time
def hello(n):
time.sleep(1)
print "Inside function", n
return n
with joblib.Parallel(n_jobs=1) as MP:
func = joblib.delayed(hello)
for x in MP(func(x) for x in range(3)):
print "Outside function", x
打印:
Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2
我想看看输出:
Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2
或类似的东西,表明可迭代 MP(...)
不等待所有结果完成。对于更长的演示更改 n_jobs=-1
和 range(100)
.
>>> import joblib, time
>>>
>>> def hello(n):
... time.sleep(1)
... print "Inside function", n
... return n
...
>>> with joblib.Parallel(n_jobs=1) as MP:
... func = joblib.delayed(hello)
... res = MP(func(x) for x in range(3)) # This is not an iterator.
...
Inside function 0
Inside function 1
Inside function 2
>>> type(res)
<type 'list'>
你处理的不是发电机。因此,您不应期望它会为您提供中间结果。我在文档中阅读的内容似乎没有提及(或者我没有阅读相关部分)。
欢迎您阅读文档并搜索 "intermediate" 结果主题: https://pythonhosted.org/joblib/search.html?q=intermediate&check_keywords=yes&area=default
我的理解是每次调用parallel
都是一个barrier,为了得到中间结果,需要分块处理:
>>> import joblib, time
>>>
>>> def hello(n):
... time.sleep(1)
... print "Inside function", n
... return n
...
>>> with joblib.Parallel(n_jobs=1) as MP:
... func = joblib.delayed(hello)
... for chunk in range(3):
... x = MP(func(y) for y in [chunk])
... print "Outside function", x
...
Inside function 0
Outside function [0]
Inside function 1
Outside function [1]
Inside function 2
Outside function [2]
>>>
如果你想获得技术,有一个回调机制,但它专门用于进度报告(BatchCompletionCallBack
),但你需要更多涉及的代码更改。
从 joblib 获得即时结果,例如:
from joblib._parallel_backends import MultiprocessingBackend
class ImmediateResult_Backend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % (result))
# Overload apply_async and set callback=self.callback
def apply_async(self, func, callback=None):
applyResult = super().apply_async(func, self.callback)
return applyResult
joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)
with joblib.Parallel(n_jobs=2) as parallel:
func = parallel(delayed(hello)(y) for y in range(3))
for f in func:
print("Outside function %s" % (f))
输出:
注意:我在def hello(...)
中使用了time.sleep(n * random.randrange(1,5))
,因此processes
变得不同了。
Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2
测试 Python:3.4.2 - joblib:0.11
stovfl 的回答很优雅,但它只适用于第一批派出的。在示例中,它之所以有效,是因为工人们从不挨饿 (n_tasks < 2*n_jobs
)。要使这种方法起作用,还必须调用最初传递给 apply_async
的回调。这是 BatchCompletionCallBack
的实例,它安排下一批要处理的任务。
一种可能的解决方案是将任意回调包装在一个可调用对象中,像这样(在 joblib==0.11,py36 中测试):
from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class ImmediateResultBackend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % result)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)
register_parallel_backend('custom', ImmediateResultBackend)
def hello(n):
time.sleep(1)
print("Inside function", n)
return n
with parallel_backend('custom'):
res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))
输出
Inside function 0
Inside function 1
ImmediateResult function [0]
ImmediateResult function [1]
Inside function 3
Inside function 2
ImmediateResult function [3]
ImmediateResult function [2]
Inside function 4
ImmediateResult function [4]
Inside function 5
ImmediateResult function [5]