级联多个 RNN 模型以获得 N 维输出
Cascade multiple RNN models for N-dimensional output
我在以不寻常的方式将两个模型链接在一起时遇到了一些困难。
我正在尝试复制以下流程图:
为清楚起见,在 Model[0]
的每个时间步,我尝试使用 Model[1]
从 IR[i]
(中间表示)生成整个时间序列作为重复输入。该方案的目的是允许从一维输入生成参差不齐的二维时间序列(同时允许在不需要该时间步的输出时省略第二个模型,并且不需要 Model[0]
在接受输入和生成输出之间不断地“切换模式”。
我假设需要一个自定义训练循环,并且我已经有一个自定义训练循环来处理第一个模型中的状态(以前的版本在每个时间步只有一个输出)。如图所示,第二个模型应该具有相当短的输出(能够限制为少于 10 个时间步长)。
但归根结底,虽然我可以全神贯注于我想做的事情,但我对 Keras and/or Tensorflow 还不够熟练,无法真正实现它。 (事实上,这是我与图书馆的第一个非玩具项目。)
我在文献中搜索了与鹦鹉相似的方案或与 fiddle 相似的示例代码,但没有成功。我什至不知道这个想法是否可以从 TF/Keras.
内部实现
我已经让这两个模型独立工作了。 (因为我已经计算出维度,并用虚拟数据进行了一些训练以获得第二个模型的垃圾输出,而第一个模型基于此问题的先前迭代并且已经过全面训练。)如果我将 Model[0]
和 Model[1]
作为 python 变量(让我们称它们为 model_a
和 model_b
),那么我如何将它们链接在一起来做到这一点?
编辑添加:
如果这一切都不清楚,也许了解每个输入和输出的维度会有所帮助:
每个输入输出的维度为:
输入:(batch_size, model_a_timesteps, input_size)
IR: (batch_size, model_a_timesteps, ir_size)
IR[i](复制后):(batch_size, model_b_timesteps, ir_size)
输出[i]:(batch_size, model_b_timesteps, output_size)
输出:(batch_size, model_a_timesteps, model_b_timesteps, output_size)
由于这个问题有多个主要部分,我专门针对核心挑战进行了问答:。本回答侧重于实现可变输出步长。
描述:
- 正如在案例 5 中验证的那样,我们可以采用自下而上的优先方法。首先,我们将完整的输入提供给
model_a
(A) - 然后,将其输出作为输入提供给 model_b
(B),但是这次 一次一个步骤 .
- 请注意,我们必须链接 B 的输出步骤 per A 的输入步骤,而不是 between A 的输入步骤;即,在您的图表中,梯度在
Out[0][1]
和 Out[0][0]
之间流动,但不会在 Out[2][0]
和 Out[0][1]
之间流动。
- 为了计算损失,我们使用粗糙张量还是填充张量都无关紧要;但是,我们必须使用填充张量来写入 TensorArray。
- 下面代码中的循环逻辑是通用的;但是,为简单起见,特定属性处理和隐藏状态传递是硬编码的,但可以重写以实现通用性。
代码: 在底部。
示例:
- 这里我们预定义了每个来自 A 的输入 B 的迭代次数,但是我们可以实现任意的停止逻辑。例如,我们可以将 B 的
Dense
层的输出作为隐藏状态,并检查其 L2 范数是否超过阈值。
- 根据上述,如果
longest_step
对我们来说是未知的,我们可以简单地设置它,这对于 NLP 和其他带有 STOP 令牌的任务很常见。
- 或者,我们可以写在每个 A 的输入处用
dynamic_size=True
分隔 TensorArrays
;请参阅下面的“不确定点”。
- 一个值得关注的问题是,我们如何知道梯度流动正确?请注意,我们已经在链接的问答中针对垂直和水平方向验证了它们,但对于多个输入步骤,它并未涵盖每个输入步骤的多个输出步骤。见下文。
不确定点:我不完全确定渐变是否在例如之间相互作用。 Out[0][1]
和 Out[2][0]
。但是,我确实验证了,如果我们为每个 A 的输入写入单独的 TensorArray
,梯度 将不会 水平流动(案例 2);重新实施案例 4 和案例 5,两个 模型的梯度会有所不同,包括具有完整单水平通道的较低模型。
所以一定要统一写入TensorArray
。为此,因为没有来自例如的操作。 IR[1]
到 Out[0][1]
,我看不出 TF 会如何跟踪它 - 所以看来我们是安全的。但是请注意,在下面的示例中,使用 steps_at_t=[1]*6
将使两个模型中的梯度水平流动,因为我们正在写入单个 TensorArray
并传递隐藏状态。
然而,检查的案例是混淆的,B 在所有步骤都是有状态的;取消这个要求,我们可能 not 需要为所有 Out[0]
、Out[1]
等写一个统一的 TensorArray
,但我们仍然必须测试我们知道有用的东西,不再那么简单了。
示例[代码]:
import numpy as np
import tensorflow as tf
#%%# Make data & models, then fit ###########################################
x0 = y0 = tf.constant(np.random.randn(2, 3, 4))
msn = MultiStatefulNetwork(batch_shape=(2, 3, 4), steps_at_t=[3, 4, 2])
#%%#############################################
with tf.GradientTape(persistent=True) as tape:
outputs = msn(x0)
# shape: (3, 4, 2, 4), 0-padded
# We can pad labels accordingly.
# Note the (2, 4) model_b's output shape, which is a timestep slice;
# model_b is a *slice model*. Careful in implementing various logics
# which are and aren't intended to be stateful.
方法:
不是最干净的代码,也不是最优化的代码,但它可以工作;改进空间。
更重要的是:我在 Eager 中实现了它,但不知道它在 Graph 中的工作方式,要使其同时适用于两者可能会非常棘手。如果需要,只需在图表中 运行 并比较所有值,就像在“案例”中所做的那样。
# ideally we won't `import tensorflow` at all; kept for code simplicity
import tensorflow as tf
from tensorflow.python.util import nest
from tensorflow.python.ops import array_ops, tensor_array_ops
from tensorflow.python.framework import ops
from tensorflow.keras.layers import Input, SimpleRNN, SimpleRNNCell
from tensorflow.keras.models import Model
#######################################################################
class MultiStatefulNetwork():
def __init__(self, batch_shape=(2, 6, 4), steps_at_t=[]):
self.batch_shape=batch_shape
self.steps_at_t=steps_at_t
self.batch_size = batch_shape[0]
self.units = batch_shape[-1]
self._build_models()
def __call__(self, inputs):
outputs = self._forward_pass_a(inputs)
outputs = self._forward_pass_b(outputs)
return outputs
def _forward_pass_a(self, inputs):
return self.model_a(inputs, training=True)
def _forward_pass_b(self, inputs):
return model_rnn_outer(self.model_b, inputs, self.steps_at_t)
def _build_models(self):
ipt = Input(batch_shape=self.batch_shape)
out = SimpleRNN(self.units, return_sequences=True)(ipt)
self.model_a = Model(ipt, out)
ipt = Input(batch_shape=(self.batch_size, self.units))
sipt = Input(batch_shape=(self.batch_size, self.units))
out, state = SimpleRNNCell(4)(ipt, sipt)
self.model_b = Model([ipt, sipt], [out, state])
self.model_a.compile('sgd', 'mse')
self.model_b.compile('sgd', 'mse')
def inner_pass(model, inputs, states):
return model_rnn(model, inputs, states)
def model_rnn_outer(model, inputs, steps_at_t=[2, 2, 4, 3]):
def outer_step_function(inputs, states):
x, steps = inputs
x = array_ops.expand_dims(x, 0)
x = array_ops.tile(x, [steps, *[1] * (x.ndim - 1)]) # repeat steps times
output, new_states = inner_pass(model, x, states)
return output, new_states
(outer_steps, steps_at_t, longest_step, outer_t, initial_states,
output_ta, input_ta) = _process_args_outer(model, inputs, steps_at_t)
def _outer_step(outer_t, output_ta_t, *states):
current_input = [input_ta.read(outer_t), steps_at_t.read(outer_t)]
output, new_states = outer_step_function(current_input, tuple(states))
# pad if shorter than longest_step.
# model_b may output twice, but longest in `steps_at_t` is 4; then we need
# output.shape == (2, *model_b.output_shape) -> (4, *...)
# checking directly on `output` is more reliable than from `steps_at_t`
output = tf.cond(
tf.math.less(output.shape[0], longest_step),
lambda: tf.pad(output, [[0, longest_step - output.shape[0]],
*[[0, 0]] * (output.ndim - 1)]),
lambda: output)
output_ta_t = output_ta_t.write(outer_t, output)
return (outer_t + 1, output_ta_t) + tuple(new_states)
final_outputs = tf.while_loop(
body=_outer_step,
loop_vars=(outer_t, output_ta) + initial_states,
cond=lambda outer_t, *_: tf.math.less(outer_t, outer_steps))
output_ta = final_outputs[1]
outputs = output_ta.stack()
return outputs
def _process_args_outer(model, inputs, steps_at_t):
def swap_batch_timestep(input_t):
# Swap the batch and timestep dim for the incoming tensor.
# (samples, timesteps, channels) -> (timesteps, samples, channels)
# iterating dim0 to feed (samples, channels) slices expected by RNN
axes = list(range(len(input_t.shape)))
axes[0], axes[1] = 1, 0
return array_ops.transpose(input_t, axes)
inputs = nest.map_structure(swap_batch_timestep, inputs)
assert inputs.shape[0] == len(steps_at_t)
outer_steps = array_ops.shape(inputs)[0] # model_a_steps
longest_step = max(steps_at_t)
steps_at_t = tensor_array_ops.TensorArray(
dtype=tf.int32, size=len(steps_at_t)).unstack(steps_at_t)
# assume single-input network, excluding states which are handled separately
input_ta = tensor_array_ops.TensorArray(
dtype=inputs.dtype,
size=outer_steps,
element_shape=tf.TensorShape(model.input_shape[0]),
tensor_array_name='outer_input_ta_0').unstack(inputs)
# TensorArray is used to write outputs at every timestep, but does not
# support RaggedTensor; thus we must make TensorArray such that column length
# is that of the longest outer step, # and pad model_b's outputs accordingly
element_shape = tf.TensorShape((longest_step, *model.output_shape[0]))
# overall shape: (outer_steps, longest_step, *model_b.output_shape)
# for every input / at each step we write in dim0 (outer_steps)
output_ta = tensor_array_ops.TensorArray(
dtype=model.output[0].dtype,
size=outer_steps,
element_shape=element_shape,
tensor_array_name='outer_output_ta_0')
outer_t = tf.constant(0, dtype='int32')
initial_states = (tf.zeros(model.input_shape[0], dtype='float32'),)
return (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
output_ta, input_ta)
def model_rnn(model, inputs, states):
def step_function(inputs, states):
output, new_states = model([inputs, *states], training=True)
return output, new_states
initial_states = states
input_ta, output_ta, time, time_steps_t = _process_args(model, inputs)
def _step(time, output_ta_t, *states):
current_input = input_ta.read(time)
output, new_states = step_function(current_input, tuple(states))
flat_state = nest.flatten(states)
flat_new_state = nest.flatten(new_states)
for state, new_state in zip(flat_state, flat_new_state):
if isinstance(new_state, ops.Tensor):
new_state.set_shape(state.shape)
output_ta_t = output_ta_t.write(time, output)
new_states = nest.pack_sequence_as(initial_states, flat_new_state)
return (time + 1, output_ta_t) + tuple(new_states)
final_outputs = tf.while_loop(
body=_step,
loop_vars=(time, output_ta) + tuple(initial_states),
cond=lambda time, *_: tf.math.less(time, time_steps_t))
new_states = final_outputs[2:]
output_ta = final_outputs[1]
outputs = output_ta.stack()
return outputs, new_states
def _process_args(model, inputs):
time_steps_t = tf.constant(inputs.shape[0], dtype='int32')
# assume single-input network (excluding states)
input_ta = tensor_array_ops.TensorArray(
dtype=inputs.dtype,
size=time_steps_t,
tensor_array_name='input_ta_0').unstack(inputs)
# assume single-output network (excluding states)
output_ta = tensor_array_ops.TensorArray(
dtype=model.output[0].dtype,
size=time_steps_t,
element_shape=tf.TensorShape(model.output_shape[0]),
tensor_array_name='output_ta_0')
time = tf.constant(0, dtype='int32', name='time')
return input_ta, output_ta, time, time_steps_t
我在以不寻常的方式将两个模型链接在一起时遇到了一些困难。
我正在尝试复制以下流程图:
为清楚起见,在 Model[0]
的每个时间步,我尝试使用 Model[1]
从 IR[i]
(中间表示)生成整个时间序列作为重复输入。该方案的目的是允许从一维输入生成参差不齐的二维时间序列(同时允许在不需要该时间步的输出时省略第二个模型,并且不需要 Model[0]
在接受输入和生成输出之间不断地“切换模式”。
我假设需要一个自定义训练循环,并且我已经有一个自定义训练循环来处理第一个模型中的状态(以前的版本在每个时间步只有一个输出)。如图所示,第二个模型应该具有相当短的输出(能够限制为少于 10 个时间步长)。
但归根结底,虽然我可以全神贯注于我想做的事情,但我对 Keras and/or Tensorflow 还不够熟练,无法真正实现它。 (事实上,这是我与图书馆的第一个非玩具项目。)
我在文献中搜索了与鹦鹉相似的方案或与 fiddle 相似的示例代码,但没有成功。我什至不知道这个想法是否可以从 TF/Keras.
内部实现我已经让这两个模型独立工作了。 (因为我已经计算出维度,并用虚拟数据进行了一些训练以获得第二个模型的垃圾输出,而第一个模型基于此问题的先前迭代并且已经过全面训练。)如果我将 Model[0]
和 Model[1]
作为 python 变量(让我们称它们为 model_a
和 model_b
),那么我如何将它们链接在一起来做到这一点?
编辑添加:
如果这一切都不清楚,也许了解每个输入和输出的维度会有所帮助:
每个输入输出的维度为:
输入:(batch_size, model_a_timesteps, input_size)
IR: (batch_size, model_a_timesteps, ir_size)
IR[i](复制后):(batch_size, model_b_timesteps, ir_size)
输出[i]:(batch_size, model_b_timesteps, output_size)
输出:(batch_size, model_a_timesteps, model_b_timesteps, output_size)
由于这个问题有多个主要部分,我专门针对核心挑战进行了问答:
描述:
- 正如在案例 5 中验证的那样,我们可以采用自下而上的优先方法。首先,我们将完整的输入提供给
model_a
(A) - 然后,将其输出作为输入提供给model_b
(B),但是这次 一次一个步骤 . - 请注意,我们必须链接 B 的输出步骤 per A 的输入步骤,而不是 between A 的输入步骤;即,在您的图表中,梯度在
Out[0][1]
和Out[0][0]
之间流动,但不会在Out[2][0]
和Out[0][1]
之间流动。 - 为了计算损失,我们使用粗糙张量还是填充张量都无关紧要;但是,我们必须使用填充张量来写入 TensorArray。
- 下面代码中的循环逻辑是通用的;但是,为简单起见,特定属性处理和隐藏状态传递是硬编码的,但可以重写以实现通用性。
代码: 在底部。
示例:
- 这里我们预定义了每个来自 A 的输入 B 的迭代次数,但是我们可以实现任意的停止逻辑。例如,我们可以将 B 的
Dense
层的输出作为隐藏状态,并检查其 L2 范数是否超过阈值。 - 根据上述,如果
longest_step
对我们来说是未知的,我们可以简单地设置它,这对于 NLP 和其他带有 STOP 令牌的任务很常见。- 或者,我们可以写在每个 A 的输入处用
dynamic_size=True
分隔TensorArrays
;请参阅下面的“不确定点”。
- 或者,我们可以写在每个 A 的输入处用
- 一个值得关注的问题是,我们如何知道梯度流动正确?请注意,我们已经在链接的问答中针对垂直和水平方向验证了它们,但对于多个输入步骤,它并未涵盖每个输入步骤的多个输出步骤。见下文。
不确定点:我不完全确定渐变是否在例如之间相互作用。 Out[0][1]
和 Out[2][0]
。但是,我确实验证了,如果我们为每个 A 的输入写入单独的 TensorArray
,梯度 将不会 水平流动(案例 2);重新实施案例 4 和案例 5,两个 模型的梯度会有所不同,包括具有完整单水平通道的较低模型。
所以一定要统一写入TensorArray
。为此,因为没有来自例如的操作。 IR[1]
到 Out[0][1]
,我看不出 TF 会如何跟踪它 - 所以看来我们是安全的。但是请注意,在下面的示例中,使用 steps_at_t=[1]*6
将使两个模型中的梯度水平流动,因为我们正在写入单个 TensorArray
并传递隐藏状态。
然而,检查的案例是混淆的,B 在所有步骤都是有状态的;取消这个要求,我们可能 not 需要为所有 Out[0]
、Out[1]
等写一个统一的 TensorArray
,但我们仍然必须测试我们知道有用的东西,不再那么简单了。
示例[代码]:
import numpy as np
import tensorflow as tf
#%%# Make data & models, then fit ###########################################
x0 = y0 = tf.constant(np.random.randn(2, 3, 4))
msn = MultiStatefulNetwork(batch_shape=(2, 3, 4), steps_at_t=[3, 4, 2])
#%%#############################################
with tf.GradientTape(persistent=True) as tape:
outputs = msn(x0)
# shape: (3, 4, 2, 4), 0-padded
# We can pad labels accordingly.
# Note the (2, 4) model_b's output shape, which is a timestep slice;
# model_b is a *slice model*. Careful in implementing various logics
# which are and aren't intended to be stateful.
方法:
不是最干净的代码,也不是最优化的代码,但它可以工作;改进空间。
更重要的是:我在 Eager 中实现了它,但不知道它在 Graph 中的工作方式,要使其同时适用于两者可能会非常棘手。如果需要,只需在图表中 运行 并比较所有值,就像在“案例”中所做的那样。
# ideally we won't `import tensorflow` at all; kept for code simplicity
import tensorflow as tf
from tensorflow.python.util import nest
from tensorflow.python.ops import array_ops, tensor_array_ops
from tensorflow.python.framework import ops
from tensorflow.keras.layers import Input, SimpleRNN, SimpleRNNCell
from tensorflow.keras.models import Model
#######################################################################
class MultiStatefulNetwork():
def __init__(self, batch_shape=(2, 6, 4), steps_at_t=[]):
self.batch_shape=batch_shape
self.steps_at_t=steps_at_t
self.batch_size = batch_shape[0]
self.units = batch_shape[-1]
self._build_models()
def __call__(self, inputs):
outputs = self._forward_pass_a(inputs)
outputs = self._forward_pass_b(outputs)
return outputs
def _forward_pass_a(self, inputs):
return self.model_a(inputs, training=True)
def _forward_pass_b(self, inputs):
return model_rnn_outer(self.model_b, inputs, self.steps_at_t)
def _build_models(self):
ipt = Input(batch_shape=self.batch_shape)
out = SimpleRNN(self.units, return_sequences=True)(ipt)
self.model_a = Model(ipt, out)
ipt = Input(batch_shape=(self.batch_size, self.units))
sipt = Input(batch_shape=(self.batch_size, self.units))
out, state = SimpleRNNCell(4)(ipt, sipt)
self.model_b = Model([ipt, sipt], [out, state])
self.model_a.compile('sgd', 'mse')
self.model_b.compile('sgd', 'mse')
def inner_pass(model, inputs, states):
return model_rnn(model, inputs, states)
def model_rnn_outer(model, inputs, steps_at_t=[2, 2, 4, 3]):
def outer_step_function(inputs, states):
x, steps = inputs
x = array_ops.expand_dims(x, 0)
x = array_ops.tile(x, [steps, *[1] * (x.ndim - 1)]) # repeat steps times
output, new_states = inner_pass(model, x, states)
return output, new_states
(outer_steps, steps_at_t, longest_step, outer_t, initial_states,
output_ta, input_ta) = _process_args_outer(model, inputs, steps_at_t)
def _outer_step(outer_t, output_ta_t, *states):
current_input = [input_ta.read(outer_t), steps_at_t.read(outer_t)]
output, new_states = outer_step_function(current_input, tuple(states))
# pad if shorter than longest_step.
# model_b may output twice, but longest in `steps_at_t` is 4; then we need
# output.shape == (2, *model_b.output_shape) -> (4, *...)
# checking directly on `output` is more reliable than from `steps_at_t`
output = tf.cond(
tf.math.less(output.shape[0], longest_step),
lambda: tf.pad(output, [[0, longest_step - output.shape[0]],
*[[0, 0]] * (output.ndim - 1)]),
lambda: output)
output_ta_t = output_ta_t.write(outer_t, output)
return (outer_t + 1, output_ta_t) + tuple(new_states)
final_outputs = tf.while_loop(
body=_outer_step,
loop_vars=(outer_t, output_ta) + initial_states,
cond=lambda outer_t, *_: tf.math.less(outer_t, outer_steps))
output_ta = final_outputs[1]
outputs = output_ta.stack()
return outputs
def _process_args_outer(model, inputs, steps_at_t):
def swap_batch_timestep(input_t):
# Swap the batch and timestep dim for the incoming tensor.
# (samples, timesteps, channels) -> (timesteps, samples, channels)
# iterating dim0 to feed (samples, channels) slices expected by RNN
axes = list(range(len(input_t.shape)))
axes[0], axes[1] = 1, 0
return array_ops.transpose(input_t, axes)
inputs = nest.map_structure(swap_batch_timestep, inputs)
assert inputs.shape[0] == len(steps_at_t)
outer_steps = array_ops.shape(inputs)[0] # model_a_steps
longest_step = max(steps_at_t)
steps_at_t = tensor_array_ops.TensorArray(
dtype=tf.int32, size=len(steps_at_t)).unstack(steps_at_t)
# assume single-input network, excluding states which are handled separately
input_ta = tensor_array_ops.TensorArray(
dtype=inputs.dtype,
size=outer_steps,
element_shape=tf.TensorShape(model.input_shape[0]),
tensor_array_name='outer_input_ta_0').unstack(inputs)
# TensorArray is used to write outputs at every timestep, but does not
# support RaggedTensor; thus we must make TensorArray such that column length
# is that of the longest outer step, # and pad model_b's outputs accordingly
element_shape = tf.TensorShape((longest_step, *model.output_shape[0]))
# overall shape: (outer_steps, longest_step, *model_b.output_shape)
# for every input / at each step we write in dim0 (outer_steps)
output_ta = tensor_array_ops.TensorArray(
dtype=model.output[0].dtype,
size=outer_steps,
element_shape=element_shape,
tensor_array_name='outer_output_ta_0')
outer_t = tf.constant(0, dtype='int32')
initial_states = (tf.zeros(model.input_shape[0], dtype='float32'),)
return (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
output_ta, input_ta)
def model_rnn(model, inputs, states):
def step_function(inputs, states):
output, new_states = model([inputs, *states], training=True)
return output, new_states
initial_states = states
input_ta, output_ta, time, time_steps_t = _process_args(model, inputs)
def _step(time, output_ta_t, *states):
current_input = input_ta.read(time)
output, new_states = step_function(current_input, tuple(states))
flat_state = nest.flatten(states)
flat_new_state = nest.flatten(new_states)
for state, new_state in zip(flat_state, flat_new_state):
if isinstance(new_state, ops.Tensor):
new_state.set_shape(state.shape)
output_ta_t = output_ta_t.write(time, output)
new_states = nest.pack_sequence_as(initial_states, flat_new_state)
return (time + 1, output_ta_t) + tuple(new_states)
final_outputs = tf.while_loop(
body=_step,
loop_vars=(time, output_ta) + tuple(initial_states),
cond=lambda time, *_: tf.math.less(time, time_steps_t))
new_states = final_outputs[2:]
output_ta = final_outputs[1]
outputs = output_ta.stack()
return outputs, new_states
def _process_args(model, inputs):
time_steps_t = tf.constant(inputs.shape[0], dtype='int32')
# assume single-input network (excluding states)
input_ta = tensor_array_ops.TensorArray(
dtype=inputs.dtype,
size=time_steps_t,
tensor_array_name='input_ta_0').unstack(inputs)
# assume single-output network (excluding states)
output_ta = tensor_array_ops.TensorArray(
dtype=model.output[0].dtype,
size=time_steps_t,
element_shape=tf.TensorShape(model.output_shape[0]),
tensor_array_name='output_ta_0')
time = tf.constant(0, dtype='int32', name='time')
return input_ta, output_ta, time, time_steps_t