如何使用 theano.op 在 pymc3 中编写自定义确定性或随机性?
How to write a custom Deterministic or Stochastic in pymc3 with theano.op?
我正在做一些 pymc3,我想创建自定义随机指标,但是似乎没有很多关于它是如何完成的文档。我知道如何使用 as_op way,但显然这使得无法使用 NUTS 采样器,在这种情况下我看不到 pymc3 相对于 pymc 的优势。
教程中提到可以通过继承theano.Op来完成。但是谁能告诉我它是如何工作的(我还在开始使用 theano)?我有两个要定义的随机指标。
第一个应该更简单,它是一个 N 维向量 F
,只有常量父变量:
with myModel:
F = DensityDist('F', lambda value: pymc.skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)
我想要一个偏正态分布,pymc3好像还没有实现,我刚导入了pymc2版本。不幸的是,F_mu_array, F_std_array, F_a_array and F
都是 N 维向量,而 lambda 似乎不适用于 N 维列表 value
。
首先,有没有办法让lambda输入为N维数组?如果没有,我想我需要直接定义随机指标 F
,这就是我认为我需要 theano.Op 才能使其工作的地方。
第二个例子是其他Stochastics的一个更复杂的函数。在这里我想如何定义它(目前不正确):
with myModel:
ln2_var = Uniform('ln2_var', lower=-10, upper=4)
sigma = Deterministic('sigma', exp(0.5*ln2_var))
A = Uniform('A', lower=-10, upper=10, shape=5)
C = Uniform('C', lower=0.0, upper=2.0, shape=5)
sw = Normal('sw', mu=5.5, sd=0.5, shape=5)
# F from before
F = DensityDist('F', lambda value: skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)
M = Normal('M', mu=M_obs_array, sd=M_stdev, shape=N)
# Radius forward-model (THIS IS THE STOCHASTIC IN QUESTION)
R = Normal('R', mu = R_forward(F, M, A, C, sw, N), sd=sigma, shape=N)
其中函数 R_forward(F,M,A,C,sw,N)
被天真地定义为:
from theano.tensor import lt, le, eq, gt, ge
def R_forward(Flux, Mass, A, C, sw, num):
for i in range(num):
if lt(Mass[i], 0.2):
if lt(Flux[i], sw[0]):
muR = C[0]
else:
muR = A[0]*log10(Flux[i]) + C[0] - A[0]*log10(sw[0])
elif (le(0.2, Mass[i]) or le(Mass[i], 0.5)):
if lt(Flux[i], sw[1]):
muR = C[1]
else:
muR = A[1]*log10(Flux[i]) + C[1] - A[1]*log10(sw[1])
elif (le(0.5, Mass[i]) or le(Mass[i], 1.5)):
if lt(Flux[i], sw[2]):
muR = C[2]
else:
muR = A[2]*log10(Flux[i]) + C[2] - A[2]*log10(sw[2])
elif (le(1.5, Mass[i]) or le(Mass[i], 3.5)):
if lt(Flux[i], sw[3]):
muR = C[3]
else:
muR = A[3]*log10(Flux[i]) + C[3] - A[3]*log10(sw[3])
else:
if lt(Flux[i], sw[4]):
muR = C[4]
else:
muR = A[4]*log10(Flux[i]) + C[4] - A[4]*log10(sw[4])
return muR
这当然是行不通的。我知道如何使用 as_op
,但我想保留 NUTS 采样。
我意识到现在有点晚了,但我想我还是会(相当含糊地)回答这个问题。
如果你想定义一个随机函数(例如概率分布),那么你需要做几件事:
首先,定义离散 (pymc3.distributions.Discrete) 或连续的子class,它至少具有方法 logp,returns 随机的对数似然。如果您将其定义为一个简单的符号方程 (x+1),我相信您不需要处理任何梯度(但不要在这方面引用我的话;see the documentation about this)。我将在下面处理更复杂的案例。在不幸的情况下,你需要做任何更复杂的事情,如你的第二个例子(顺便说一下,pymc3 现在实现了偏态正态分布),你需要定义它所需的操作(在 logp 方法中使用)作为Theano Op。如果你不需要导数,那么 as_op 就可以了,但正如你所说,梯度是 pymc3 的一种想法。
这就是事情变得复杂的地方。如果您想使用 NUTS(或出于任何原因需要梯度),那么您需要将 logp 中使用的操作实现为 theano.gof.Op 的子 class。您的新 op class(我们从现在起就称它为 Op)至少需要两个或三个方法。第一个将 inputs/outputs 定义为 Op (check the Op documentation)。 perform() 方法(或您可能选择的变体)是执行您想要的操作的方法(例如,您的 R_forward 函数)。如果您愿意,这可以在纯 python 中完成。第三种方法 grad() 是您定义 perform() 输出与输入的梯度的地方。 grad() 的实际输出有点不同,但没什么大不了的。
正是在 grad() 中,使用 Theano 得到了回报。如果您在 Theano 中定义了整个 perform(),那么您可能可以轻松地使用自动微分(theano.tensor.grad 或 theano.tensor.jacobian)为您完成工作(参见下面的示例)。然而,这并不一定容易。
在你的第二个例子中,这意味着在 Theano 中实现你的 R_forward 函数,这可能很复杂。
在这里,我包含了一个我在学习做这些事情时创建的 Op 的最小示例。
def my_th_fun():
""" Some needed auxiliary functions.
"""
X = th.tensor.vector('X')
SCALE = th.tensor.scalar('SCALE')
X.tag.test_value = np.array([1,2,3,4])
SCALE.tag.test_value = 5.
Scale, upd_sm_X = th.scan(lambda x, scale: scale*(scale+ x),
sequences=[X],
outputs_info=[SCALE])
fun_Scale = th.function(inputs=[X, SCALE], outputs=Scale)
D_out_d_scale = th.tensor.grad(Scale[-1], SCALE)
fun_d_out_d_scale = th.function([X, SCALE], D_out_d_scale)
return Scale, fun_Scale, D_out_d_scale, fun_d_out_d_scale
class myOp(th.gof.Op):
""" Op subclass with a somewhat silly computation. It uses
th.scan and th.tensor.grad is used to calculate the gradient
automagically in the grad() method.
"""
__props__ = ()
itypes = [th.tensor.dscalar]
otypes = [th.tensor.dvector]
def __init__(self, *args, **kwargs):
super(myOp, self).__init__(*args, **kwargs)
self.base_dist = np.arange(1,5)
(self.UPD_scale, self.fun_scale,
self.D_out_d_scale, self.fun_d_out_d_scale)= my_th_fun()
def perform(self, node, inputs, outputs):
scale = inputs[0]
updated_scale = self.fun_scale(self.base_dist, scale)
out1 = self.base_dist[0:2].sum()
out2 = self.base_dist[2:4].sum()
maxout = np.max([out1, out2])
exp_out1 = np.exp(updated_scale[-1]*(out1-maxout))
exp_out2 = np.exp(updated_scale[-1]*(out2-maxout))
norm_const = exp_out1 + exp_out2
outputs[0][0] = np.array([exp_out1/norm_const, exp_out2/norm_const])
def grad(self, inputs, output_gradients): #working!
""" Calculates the gradient of the output of the Op wrt
to the input. As a simple example, the input is scalar.
Notice how the output is actually the gradient multiplied
by the output_gradients, which is an input provided by
theano when calculating gradients.
"""
scale = inputs[0]
X = th.tensor.as_tensor(self.base_dist)
# Do I need to recalculate all this or can I assume that perform() has
# always been called before grad() and thus can take it from there?
# In any case, this is a small enough example to recalculate quickly:
all_scale, _ = th.scan(lambda x, scale_1: scale_1*(scale_1+ x),
sequences=[X],
outputs_info=[scale])
updated_scale = all_scale[-1]
out1 = self.base_dist[0:1].sum()
out2 = self.base_dist[2:3].sum()
maxout = np.max([out1, out2])
exp_out1 = th.tensor.exp(updated_scale*(out1 - maxout))
exp_out2 = th.tensor.exp(updated_scale*(out2 - maxout))
norm_const = exp_out1 + exp_out2
d_S_d_scale = th.theano.grad(all_scale[-1], scale)
Jac1 = (-(out1-out2)*d_S_d_scale*
th.tensor.exp(updated_scale*(out1+out2 - 2*maxout))/(norm_const**2))
Jac2 = -Jac1
return Jac1*output_gradients[0][0]+ Jac2*output_gradients[0][1],
然后可以在 pymc3 中的随机函数的 logp() 方法中使用此 Op:
import pymc3 as pm
class myDist(pm.distributions.Discrete):
def __init__(self, invT, *args, **kwargs):
super(myDist, self).__init__(*args, **kwargs)
self.invT = invT
self.myOp = myOp()
def logp(self, value):
return self.myOp(self.invT)[value]
我希望它能帮助任何(绝望的)pymc3/theano 新手。
我正在做一些 pymc3,我想创建自定义随机指标,但是似乎没有很多关于它是如何完成的文档。我知道如何使用 as_op way,但显然这使得无法使用 NUTS 采样器,在这种情况下我看不到 pymc3 相对于 pymc 的优势。
教程中提到可以通过继承theano.Op来完成。但是谁能告诉我它是如何工作的(我还在开始使用 theano)?我有两个要定义的随机指标。
第一个应该更简单,它是一个 N 维向量 F
,只有常量父变量:
with myModel:
F = DensityDist('F', lambda value: pymc.skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)
我想要一个偏正态分布,pymc3好像还没有实现,我刚导入了pymc2版本。不幸的是,F_mu_array, F_std_array, F_a_array and F
都是 N 维向量,而 lambda 似乎不适用于 N 维列表 value
。
首先,有没有办法让lambda输入为N维数组?如果没有,我想我需要直接定义随机指标 F
,这就是我认为我需要 theano.Op 才能使其工作的地方。
第二个例子是其他Stochastics的一个更复杂的函数。在这里我想如何定义它(目前不正确):
with myModel:
ln2_var = Uniform('ln2_var', lower=-10, upper=4)
sigma = Deterministic('sigma', exp(0.5*ln2_var))
A = Uniform('A', lower=-10, upper=10, shape=5)
C = Uniform('C', lower=0.0, upper=2.0, shape=5)
sw = Normal('sw', mu=5.5, sd=0.5, shape=5)
# F from before
F = DensityDist('F', lambda value: skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)
M = Normal('M', mu=M_obs_array, sd=M_stdev, shape=N)
# Radius forward-model (THIS IS THE STOCHASTIC IN QUESTION)
R = Normal('R', mu = R_forward(F, M, A, C, sw, N), sd=sigma, shape=N)
其中函数 R_forward(F,M,A,C,sw,N)
被天真地定义为:
from theano.tensor import lt, le, eq, gt, ge
def R_forward(Flux, Mass, A, C, sw, num):
for i in range(num):
if lt(Mass[i], 0.2):
if lt(Flux[i], sw[0]):
muR = C[0]
else:
muR = A[0]*log10(Flux[i]) + C[0] - A[0]*log10(sw[0])
elif (le(0.2, Mass[i]) or le(Mass[i], 0.5)):
if lt(Flux[i], sw[1]):
muR = C[1]
else:
muR = A[1]*log10(Flux[i]) + C[1] - A[1]*log10(sw[1])
elif (le(0.5, Mass[i]) or le(Mass[i], 1.5)):
if lt(Flux[i], sw[2]):
muR = C[2]
else:
muR = A[2]*log10(Flux[i]) + C[2] - A[2]*log10(sw[2])
elif (le(1.5, Mass[i]) or le(Mass[i], 3.5)):
if lt(Flux[i], sw[3]):
muR = C[3]
else:
muR = A[3]*log10(Flux[i]) + C[3] - A[3]*log10(sw[3])
else:
if lt(Flux[i], sw[4]):
muR = C[4]
else:
muR = A[4]*log10(Flux[i]) + C[4] - A[4]*log10(sw[4])
return muR
这当然是行不通的。我知道如何使用 as_op
,但我想保留 NUTS 采样。
我意识到现在有点晚了,但我想我还是会(相当含糊地)回答这个问题。
如果你想定义一个随机函数(例如概率分布),那么你需要做几件事:
首先,定义离散 (pymc3.distributions.Discrete) 或连续的子class,它至少具有方法 logp,returns 随机的对数似然。如果您将其定义为一个简单的符号方程 (x+1),我相信您不需要处理任何梯度(但不要在这方面引用我的话;see the documentation about this)。我将在下面处理更复杂的案例。在不幸的情况下,你需要做任何更复杂的事情,如你的第二个例子(顺便说一下,pymc3 现在实现了偏态正态分布),你需要定义它所需的操作(在 logp 方法中使用)作为Theano Op。如果你不需要导数,那么 as_op 就可以了,但正如你所说,梯度是 pymc3 的一种想法。
这就是事情变得复杂的地方。如果您想使用 NUTS(或出于任何原因需要梯度),那么您需要将 logp 中使用的操作实现为 theano.gof.Op 的子 class。您的新 op class(我们从现在起就称它为 Op)至少需要两个或三个方法。第一个将 inputs/outputs 定义为 Op (check the Op documentation)。 perform() 方法(或您可能选择的变体)是执行您想要的操作的方法(例如,您的 R_forward 函数)。如果您愿意,这可以在纯 python 中完成。第三种方法 grad() 是您定义 perform() 输出与输入的梯度的地方。 grad() 的实际输出有点不同,但没什么大不了的。
正是在 grad() 中,使用 Theano 得到了回报。如果您在 Theano 中定义了整个 perform(),那么您可能可以轻松地使用自动微分(theano.tensor.grad 或 theano.tensor.jacobian)为您完成工作(参见下面的示例)。然而,这并不一定容易。
在你的第二个例子中,这意味着在 Theano 中实现你的 R_forward 函数,这可能很复杂。
在这里,我包含了一个我在学习做这些事情时创建的 Op 的最小示例。
def my_th_fun():
""" Some needed auxiliary functions.
"""
X = th.tensor.vector('X')
SCALE = th.tensor.scalar('SCALE')
X.tag.test_value = np.array([1,2,3,4])
SCALE.tag.test_value = 5.
Scale, upd_sm_X = th.scan(lambda x, scale: scale*(scale+ x),
sequences=[X],
outputs_info=[SCALE])
fun_Scale = th.function(inputs=[X, SCALE], outputs=Scale)
D_out_d_scale = th.tensor.grad(Scale[-1], SCALE)
fun_d_out_d_scale = th.function([X, SCALE], D_out_d_scale)
return Scale, fun_Scale, D_out_d_scale, fun_d_out_d_scale
class myOp(th.gof.Op):
""" Op subclass with a somewhat silly computation. It uses
th.scan and th.tensor.grad is used to calculate the gradient
automagically in the grad() method.
"""
__props__ = ()
itypes = [th.tensor.dscalar]
otypes = [th.tensor.dvector]
def __init__(self, *args, **kwargs):
super(myOp, self).__init__(*args, **kwargs)
self.base_dist = np.arange(1,5)
(self.UPD_scale, self.fun_scale,
self.D_out_d_scale, self.fun_d_out_d_scale)= my_th_fun()
def perform(self, node, inputs, outputs):
scale = inputs[0]
updated_scale = self.fun_scale(self.base_dist, scale)
out1 = self.base_dist[0:2].sum()
out2 = self.base_dist[2:4].sum()
maxout = np.max([out1, out2])
exp_out1 = np.exp(updated_scale[-1]*(out1-maxout))
exp_out2 = np.exp(updated_scale[-1]*(out2-maxout))
norm_const = exp_out1 + exp_out2
outputs[0][0] = np.array([exp_out1/norm_const, exp_out2/norm_const])
def grad(self, inputs, output_gradients): #working!
""" Calculates the gradient of the output of the Op wrt
to the input. As a simple example, the input is scalar.
Notice how the output is actually the gradient multiplied
by the output_gradients, which is an input provided by
theano when calculating gradients.
"""
scale = inputs[0]
X = th.tensor.as_tensor(self.base_dist)
# Do I need to recalculate all this or can I assume that perform() has
# always been called before grad() and thus can take it from there?
# In any case, this is a small enough example to recalculate quickly:
all_scale, _ = th.scan(lambda x, scale_1: scale_1*(scale_1+ x),
sequences=[X],
outputs_info=[scale])
updated_scale = all_scale[-1]
out1 = self.base_dist[0:1].sum()
out2 = self.base_dist[2:3].sum()
maxout = np.max([out1, out2])
exp_out1 = th.tensor.exp(updated_scale*(out1 - maxout))
exp_out2 = th.tensor.exp(updated_scale*(out2 - maxout))
norm_const = exp_out1 + exp_out2
d_S_d_scale = th.theano.grad(all_scale[-1], scale)
Jac1 = (-(out1-out2)*d_S_d_scale*
th.tensor.exp(updated_scale*(out1+out2 - 2*maxout))/(norm_const**2))
Jac2 = -Jac1
return Jac1*output_gradients[0][0]+ Jac2*output_gradients[0][1],
然后可以在 pymc3 中的随机函数的 logp() 方法中使用此 Op:
import pymc3 as pm
class myDist(pm.distributions.Discrete):
def __init__(self, invT, *args, **kwargs):
super(myDist, self).__init__(*args, **kwargs)
self.invT = invT
self.myOp = myOp()
def logp(self, value):
return self.myOp(self.invT)[value]
我希望它能帮助任何(绝望的)pymc3/theano 新手。