级联多个 RNN 模型以获得 N 维输出

Cascade multiple RNN models for N-dimensional output

我在以不寻常的方式将两个模型链接在一起时遇到了一些困难。

我正在尝试复制以下流程图:

为清楚起见,在 Model[0] 的每个时间步,我尝试使用 Model[1]IR[i](中间表示)生成整个时间序列作为重复输入。该方案的目的是允许从一维输入生成参差不齐的二维时间序列(同时允许在不需要该时间步的输出时省略第二个模型,并且不需要 Model[0] 在接受输入和生成输出之间不断地“切换模式”。

我假设需要一个自定义训练循环,并且我已经有一个自定义训练循环来处理第一个模型中的状态(以前的版本在每个时间步只有一个输出)。如图所示,第二个模型应该具有相当短的输出(能够限制为少于 10 个时间步长)。

但归根结底,虽然我可以全神贯注于我想做的事情,但我对 Keras and/or Tensorflow 还不够熟练,无法真正实现它。 (事实上​​,这是我与图书馆的第一个非玩具项目。)

我在文献中搜索了与鹦鹉相似的方案或与 fiddle 相似的示例代码,但没有成功。我什至不知道这个想法是否可以从 TF/Keras.

内部实现

我已经让这两个模型独立工作了。 (因为我已经计算出维度,并用虚拟数据进行了一些训练以获得第二个模型的垃圾输出,而第一个模型基于此问题的先前迭代并且已经过全面训练。)如果我将 Model[0]Model[1] 作为 python 变量(让我们称它们为 model_amodel_b),那么我如何将它们链接在一起来做到这一点?

编辑添加:

如果这一切都不清楚,也许了解每个输入和输出的维度会有所帮助:

每个输入输出的维度为:

输入:(batch_size, model_a_timesteps, input_size)
IR: (batch_size, model_a_timesteps, ir_size)

IR[i](复制后):(batch_size, model_b_timesteps, ir_size)
输出[i]:(batch_size, model_b_timesteps, output_size)
输出:(batch_size, model_a_timesteps, model_b_timesteps, output_size)

由于这个问题有多个主要部分,我专门针对核心挑战进行了问答:。本回答侧重于实现可变输出步长。


描述:

  • 正如在案例 5 中验证的那样,我们可以采用自下而上的优先方法。首先,我们将完整的输入提供给 model_a (A) - 然后,将其输出作为输入提供给 model_b (B),但是这次 一次一个步骤 .
  • 请注意,我们必须链接 B 的输出步骤 per A 的输入步骤,而不是 between A 的输入步骤;即,在您的图表中,梯度在 Out[0][1]Out[0][0] 之间流动,但不会在 Out[2][0]Out[0][1] 之间流动。
  • 为了计算损失,我们使用粗糙张量还是填充张量都无关紧要;但是,我们必须使用填充张量来写入 TensorArray。
  • 下面代码中的循环逻辑是通用的;但是,为简单起见,特定属性处理和隐藏状态传递是硬编码的,但可以重写以实现通用性。

代码: 在底部。


示例:

  • 这里我们预定义了每个来自 A 的输入 B 的迭代次数,但是我们可以实现任意的停止逻辑。例如,我们可以将 B 的 Dense 层的输出作为隐藏状态,并检查其 L2 范数是否超过阈值。
  • 根据上述,如果 longest_step 对我们来说是未知的,我们可以简单地设置它,这对于 NLP 和其他带有 STOP 令牌的任务很常见。
    • 或者,我们可以写在每个 A 的输入处用 dynamic_size=True 分隔 TensorArrays;请参阅下面的“不确定点”。
  • 一个值得关注的问题是,我们如何知道梯度流动正确?请注意,我们已经在链接的问答中针对垂直和水平方向验证了它们,但对于多个输入步骤,它并未涵盖每个输入步骤的多个输出步骤。见下文。

不确定点:我不完全确定渐变是否在例如之间相互作用。 Out[0][1]Out[2][0]。但是,我确实验证了,如果我们为每个 A 的输入写入单独的 TensorArray ,梯度 将不会 水平流动(案例 2);重新实施案例 4 和案例 5,两个 模型的梯度会有所不同,包括具有完整单水平通道的较低模型。

所以一定要统一写入TensorArray。为此,因为没有来自例如的操作。 IR[1]Out[0][1],我看不出 TF 会如何跟踪它 - 所以看来我们是安全的。但是请注意,在下面的示例中,使用 steps_at_t=[1]*6 将使两个模型中的梯度水平流动,因为我们正在写入单个 TensorArray 并传递隐藏状态。

然而,检查的案例是混淆的,B 在所有步骤都是有状态的;取消这个要求,我们可能 not 需要为所有 Out[0]Out[1] 等写一个统一的 TensorArray,但我们仍然必须测试我们知道有用的东西,不再那么简单了。


示例[代码]:

import numpy as np
import tensorflow as tf

#%%# Make data & models, then fit ###########################################
x0 = y0 = tf.constant(np.random.randn(2, 3, 4))
msn = MultiStatefulNetwork(batch_shape=(2, 3, 4), steps_at_t=[3, 4, 2])

#%%#############################################
with tf.GradientTape(persistent=True) as tape:
    outputs = msn(x0)
    # shape: (3, 4, 2, 4), 0-padded
    # We can pad labels accordingly.
    # Note the (2, 4) model_b's output shape, which is a timestep slice;
    # model_b is a *slice model*. Careful in implementing various logics
    # which are and aren't intended to be stateful.

方法:

不是最干净的代码,也不是最优化的代码,但它可以工作;改进空间。

更重要的是:我在 Eager 中实现了它,但不知道它在 Graph 中的工作方式,要使其同时适用于两者可能会非常棘手。如果需要,只需在图表中 运行 并比较所有值,就像在“案例”中所做的那样。

# ideally we won't `import tensorflow` at all; kept for code simplicity
import tensorflow as tf
from tensorflow.python.util import nest
from tensorflow.python.ops import array_ops, tensor_array_ops
from tensorflow.python.framework import ops

from tensorflow.keras.layers import Input, SimpleRNN, SimpleRNNCell
from tensorflow.keras.models import Model

#######################################################################
class MultiStatefulNetwork():
    def __init__(self, batch_shape=(2, 6, 4), steps_at_t=[]):
        self.batch_shape=batch_shape
        self.steps_at_t=steps_at_t

        self.batch_size = batch_shape[0]
        self.units = batch_shape[-1]
        self._build_models()

    def __call__(self, inputs):
        outputs = self._forward_pass_a(inputs)
        outputs = self._forward_pass_b(outputs)
        return outputs

    def _forward_pass_a(self, inputs):
        return self.model_a(inputs, training=True)

    def _forward_pass_b(self, inputs):
        return model_rnn_outer(self.model_b, inputs, self.steps_at_t)

    def _build_models(self):
        ipt = Input(batch_shape=self.batch_shape)
        out = SimpleRNN(self.units, return_sequences=True)(ipt)
        self.model_a = Model(ipt, out)

        ipt  = Input(batch_shape=(self.batch_size, self.units))
        sipt = Input(batch_shape=(self.batch_size, self.units))
        out, state = SimpleRNNCell(4)(ipt, sipt)
        self.model_b = Model([ipt, sipt], [out, state])

        self.model_a.compile('sgd', 'mse')
        self.model_b.compile('sgd', 'mse')


def inner_pass(model, inputs, states):
    return model_rnn(model, inputs, states)


def model_rnn_outer(model, inputs, steps_at_t=[2, 2, 4, 3]):
    def outer_step_function(inputs, states):
        x, steps = inputs
        x = array_ops.expand_dims(x, 0)
        x = array_ops.tile(x, [steps, *[1] * (x.ndim - 1)])  # repeat steps times
        output, new_states = inner_pass(model, x, states)
        return output, new_states

    (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
     output_ta, input_ta) = _process_args_outer(model, inputs, steps_at_t)

    def _outer_step(outer_t, output_ta_t, *states):
        current_input = [input_ta.read(outer_t), steps_at_t.read(outer_t)]
        output, new_states = outer_step_function(current_input, tuple(states))

        # pad if shorter than longest_step.
        # model_b may output twice, but longest in `steps_at_t` is 4; then we need
        # output.shape == (2, *model_b.output_shape) -> (4, *...)
        # checking directly on `output` is more reliable than from `steps_at_t`
        output = tf.cond(
            tf.math.less(output.shape[0], longest_step),
            lambda: tf.pad(output, [[0, longest_step - output.shape[0]],
                                    *[[0, 0]] * (output.ndim - 1)]),
            lambda: output)

        output_ta_t = output_ta_t.write(outer_t, output)
        return (outer_t + 1, output_ta_t) + tuple(new_states)

    final_outputs = tf.while_loop(
        body=_outer_step,
        loop_vars=(outer_t, output_ta) + initial_states,
        cond=lambda outer_t, *_: tf.math.less(outer_t, outer_steps))

    output_ta = final_outputs[1]
    outputs = output_ta.stack()
    return outputs


def _process_args_outer(model, inputs, steps_at_t):
    def swap_batch_timestep(input_t):
        # Swap the batch and timestep dim for the incoming tensor.
        # (samples, timesteps, channels) -> (timesteps, samples, channels)
        # iterating dim0 to feed (samples, channels) slices expected by RNN
        axes = list(range(len(input_t.shape)))
        axes[0], axes[1] = 1, 0
        return array_ops.transpose(input_t, axes)

    inputs = nest.map_structure(swap_batch_timestep, inputs)

    assert inputs.shape[0] == len(steps_at_t)
    outer_steps = array_ops.shape(inputs)[0]  # model_a_steps
    longest_step = max(steps_at_t)
    steps_at_t = tensor_array_ops.TensorArray(
        dtype=tf.int32, size=len(steps_at_t)).unstack(steps_at_t)

    # assume single-input network, excluding states which are handled separately
    input_ta = tensor_array_ops.TensorArray(
        dtype=inputs.dtype,
        size=outer_steps,
        element_shape=tf.TensorShape(model.input_shape[0]),
        tensor_array_name='outer_input_ta_0').unstack(inputs)

    # TensorArray is used to write outputs at every timestep, but does not
    # support RaggedTensor; thus we must make TensorArray such that column length
    # is that of the longest outer step, # and pad model_b's outputs accordingly
    element_shape = tf.TensorShape((longest_step, *model.output_shape[0]))

    # overall shape: (outer_steps, longest_step, *model_b.output_shape)
    # for every input / at each step we write in dim0 (outer_steps)
    output_ta = tensor_array_ops.TensorArray(
        dtype=model.output[0].dtype,
        size=outer_steps,
        element_shape=element_shape,
        tensor_array_name='outer_output_ta_0')

    outer_t = tf.constant(0, dtype='int32')
    initial_states = (tf.zeros(model.input_shape[0], dtype='float32'),)

    return (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
            output_ta, input_ta)


def model_rnn(model, inputs, states):
    def step_function(inputs, states):
        output, new_states = model([inputs, *states], training=True)
        return output, new_states

    initial_states = states
    input_ta, output_ta, time, time_steps_t = _process_args(model, inputs)

    def _step(time, output_ta_t, *states):
        current_input = input_ta.read(time)
        output, new_states = step_function(current_input, tuple(states))

        flat_state = nest.flatten(states)
        flat_new_state = nest.flatten(new_states)
        for state, new_state in zip(flat_state, flat_new_state):
            if isinstance(new_state, ops.Tensor):
                new_state.set_shape(state.shape)

        output_ta_t = output_ta_t.write(time, output)
        new_states = nest.pack_sequence_as(initial_states, flat_new_state)
        return (time + 1, output_ta_t) + tuple(new_states)

    final_outputs = tf.while_loop(
        body=_step,
        loop_vars=(time, output_ta) + tuple(initial_states),
        cond=lambda time, *_: tf.math.less(time, time_steps_t))

    new_states = final_outputs[2:]
    output_ta = final_outputs[1]
    outputs = output_ta.stack()
    return outputs, new_states


def _process_args(model, inputs):
    time_steps_t = tf.constant(inputs.shape[0], dtype='int32')

    # assume single-input network (excluding states)
    input_ta = tensor_array_ops.TensorArray(
        dtype=inputs.dtype,
        size=time_steps_t,
        tensor_array_name='input_ta_0').unstack(inputs)

    # assume single-output network (excluding states)
    output_ta = tensor_array_ops.TensorArray(
        dtype=model.output[0].dtype,
        size=time_steps_t,
        element_shape=tf.TensorShape(model.output_shape[0]),
        tensor_array_name='output_ta_0')

    time = tf.constant(0, dtype='int32', name='time')
    return input_ta, output_ta, time, time_steps_t