如何使用 theano.op 在 pymc3 中编写自定义确定性或随机性?

How to write a custom Deterministic or Stochastic in pymc3 with theano.op?

我正在做一些 pymc3,我想创建自定义随机指标,但是似乎没有很多关于它是如何完成的文档。我知道如何使用 as_op way,但显然这使得无法使用 NUTS 采样器,在这种情况下我看不到 pymc3 相对于 pymc 的优势。

教程中提到可以通过继承theano.Op来完成。但是谁能告诉我它是如何工作的(我还在开始使用 theano)?我有两个要定义的随机指标。

第一个应该更简单,它是一个 N 维向量 F,只有常量父变量:

with myModel:
    F = DensityDist('F', lambda value: pymc.skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)

我想要一个偏正态分布,pymc3好像还没有实现,我刚导入了pymc2版本。不幸的是,F_mu_array, F_std_array, F_a_array and F 都是 N 维向量,而 lambda 似乎不适用于 N 维列表 value

首先,有没有办法让lambda输入为N维数组?如果没有,我想我需要直接定义随机指标 F,这就是我认为我需要 theano.Op 才能使其工作的地方。


第二个例子是其他Stochastics的一个更复杂的函数。在这里我想如何定义它(目前不正确):

with myModel:
    ln2_var = Uniform('ln2_var', lower=-10, upper=4)
    sigma = Deterministic('sigma', exp(0.5*ln2_var))        
    A = Uniform('A', lower=-10, upper=10, shape=5)
    C = Uniform('C', lower=0.0, upper=2.0, shape=5)
    sw = Normal('sw', mu=5.5, sd=0.5, shape=5)

    # F from before
    F = DensityDist('F', lambda value: skew_normal_like(value, F_mu_array, F_std_array, F_a_array), shape = N)
    M = Normal('M', mu=M_obs_array, sd=M_stdev, shape=N)

    #   Radius forward-model (THIS IS THE STOCHASTIC IN QUESTION)
    R = Normal('R', mu = R_forward(F, M, A, C, sw, N), sd=sigma, shape=N)

其中函数 R_forward(F,M,A,C,sw,N) 被天真地定义为:

from theano.tensor import lt, le, eq, gt, ge

def R_forward(Flux, Mass, A, C, sw, num):
    for i in range(num):
        if lt(Mass[i], 0.2):
            if lt(Flux[i], sw[0]):
                muR = C[0]
            else:
                muR = A[0]*log10(Flux[i]) + C[0] - A[0]*log10(sw[0])
        elif (le(0.2, Mass[i]) or le(Mass[i], 0.5)):
            if lt(Flux[i], sw[1]):
                muR = C[1]
            else:
                muR = A[1]*log10(Flux[i]) + C[1] - A[1]*log10(sw[1])
        elif (le(0.5, Mass[i]) or le(Mass[i], 1.5)):
            if lt(Flux[i], sw[2]):
                muR = C[2]
            else:
                muR = A[2]*log10(Flux[i]) + C[2] - A[2]*log10(sw[2])
        elif (le(1.5, Mass[i]) or le(Mass[i], 3.5)):
            if lt(Flux[i], sw[3]):
                muR = C[3]
            else:
                muR = A[3]*log10(Flux[i]) + C[3] - A[3]*log10(sw[3])
        else:
            if lt(Flux[i], sw[4]):
                muR = C[4]
            else:
                muR = A[4]*log10(Flux[i]) + C[4] - A[4]*log10(sw[4])
    return muR

这当然是行不通的。我知道如何使用 as_op,但我想保留 NUTS 采样。

我意识到现在有点晚了,但我想我还是会(相当含糊地)回答这个问题。

如果你想定义一个随机函数(例如概率分布),那么你需要做几件事:

首先,定义离散 (pymc3.distributions.Discrete) 或连​​续的子class,它至少具有方法 logp,returns 随机的对数似然。如果您将其定义为一个简单的符号方程 (x+1),我相信您不需要处理任何梯度(但不要在这方面引用我的话;see the documentation about this)。我将在下面处理更复杂的案例。在不幸的情况下,你需要做任何更复杂的事情,如你的第二个例子(顺便说一下,pymc3 现在实现了偏态正态分布),你需要定义它所需的操作(在 logp 方法中使用)作为Theano Op。如果你不需要导数,那么 as_op 就可以了,但正如你所说,梯度是 pymc3 的一种想法。

这就是事情变得复杂的地方。如果您想使用 NUTS(或出于任何原因需要梯度),那么您需要将 logp 中使用的操作实现为 theano.gof.Op 的子 class。您的新 op class(我们从现在起就称它为 Op)至少需要两个或三个方法。第一个将 inputs/outputs 定义为 Op (check the Op documentation)。 perform() 方法(或您可能选择的变体)是执行您想要的操作的方法(例如,您的 R_forward 函数)。如果您愿意,这可以在纯 python 中完成。第三种方法 grad() 是您定义 perform() 输出与输入的梯度的地方。 grad() 的实际输出有点不同,但没什么大不了的。

正是在 grad() 中,使用 Theano 得到了回报。如果您在 Theano 中定义了整个 perform(),那么您可能可以轻松地使用自动微分(theano.tensor.grad 或 theano.tensor.jacobian)为您完成工作(参见下面的示例)。然而,这并不一定容易。

在你的第二个例子中,这意味着在 Theano 中实现你的 R_forward 函数,这可能很复杂。

在这里,我包含了一个我在学习做这些事情时创建的 Op 的最小示例。

def my_th_fun():
    """ Some needed auxiliary functions.
    """
    X = th.tensor.vector('X')
    SCALE = th.tensor.scalar('SCALE')

    X.tag.test_value = np.array([1,2,3,4])
    SCALE.tag.test_value = 5.

    Scale, upd_sm_X = th.scan(lambda x, scale: scale*(scale+ x),
                               sequences=[X],
                               outputs_info=[SCALE])
    fun_Scale = th.function(inputs=[X, SCALE], outputs=Scale)
    D_out_d_scale = th.tensor.grad(Scale[-1], SCALE)
    fun_d_out_d_scale = th.function([X, SCALE], D_out_d_scale)
    return Scale, fun_Scale, D_out_d_scale, fun_d_out_d_scale

class myOp(th.gof.Op):
    """ Op subclass with a somewhat silly computation. It uses
    th.scan and th.tensor.grad is used to calculate the gradient
    automagically in the grad() method.
    """
    __props__ = ()
    itypes = [th.tensor.dscalar]
    otypes = [th.tensor.dvector]
    def __init__(self, *args, **kwargs):
        super(myOp, self).__init__(*args, **kwargs)
        self.base_dist = np.arange(1,5)
        (self.UPD_scale, self.fun_scale, 
         self.D_out_d_scale, self.fun_d_out_d_scale)= my_th_fun()

    def perform(self, node, inputs, outputs):
        scale = inputs[0]
        updated_scale = self.fun_scale(self.base_dist, scale)
        out1 = self.base_dist[0:2].sum()
        out2 = self.base_dist[2:4].sum()
        maxout = np.max([out1, out2])
        exp_out1 = np.exp(updated_scale[-1]*(out1-maxout))
        exp_out2 = np.exp(updated_scale[-1]*(out2-maxout))
        norm_const = exp_out1 + exp_out2
        outputs[0][0] = np.array([exp_out1/norm_const, exp_out2/norm_const])

    def grad(self, inputs, output_gradients): #working!
        """ Calculates the gradient of the output of the Op wrt
        to the input. As a simple example, the input is scalar.

        Notice how the output is actually the gradient multiplied
        by the output_gradients, which is an input provided by
        theano when calculating gradients.
        """
        scale = inputs[0]
        X = th.tensor.as_tensor(self.base_dist)
        # Do I need to recalculate all this or can I assume that perform() has
        # always been called before grad() and thus can take it from there?
        # In any case, this is a small enough example to recalculate quickly:
        all_scale, _ = th.scan(lambda x, scale_1: scale_1*(scale_1+ x),
                               sequences=[X],
                               outputs_info=[scale])
        updated_scale = all_scale[-1]

        out1 = self.base_dist[0:1].sum()
        out2 = self.base_dist[2:3].sum()
        maxout = np.max([out1, out2])

        exp_out1 = th.tensor.exp(updated_scale*(out1 - maxout))
        exp_out2 = th.tensor.exp(updated_scale*(out2 - maxout))
        norm_const = exp_out1 + exp_out2

        d_S_d_scale = th.theano.grad(all_scale[-1], scale)
        Jac1 = (-(out1-out2)*d_S_d_scale*
                th.tensor.exp(updated_scale*(out1+out2 - 2*maxout))/(norm_const**2))
        Jac2 = -Jac1
        return Jac1*output_gradients[0][0]+ Jac2*output_gradients[0][1],

然后可以在 pymc3 中的随机函数的 logp() 方法中使用此 Op:

import pymc3 as pm

class myDist(pm.distributions.Discrete):
    def __init__(self, invT, *args, **kwargs):
        super(myDist, self).__init__(*args, **kwargs)
        self.invT = invT
        self.myOp = myOp()
    def logp(self, value):
        return self.myOp(self.invT)[value]

我希望它能帮助任何(绝望的)pymc3/theano 新手。