Keras training with shuffled tf.data:如果训练中断,如何在最后保存的检查点的最后数据iteration/order处继续训练

Keras training with shuffled tf.data: if training is interrupted, how to continue training at last data iteration/order of last saved checkpoint

我正在使用 keras model.fit 进行训练,数据来自 tf.records,加载到 tf.data 对象中,该对象使用 .shuffle 打乱数据。我还使用 callbacks.ModelCheckpoint 每隔 x 个 steps/batches 保存模型。

有时我的云实例会在 epoch 完成之前断开连接或崩溃,但 y 步骤中的模型已保存到我的驱动器中。

我想在训练另一个时期之前完成那个时期的数据训练(我有很长的时期),所以每个数据示例每个时期都训练一次。

有没有办法获取数据的原始顺序,以及模型最后保存在数据中的位置?

到目前为止我发现了什么

看来您可以通过设置种子来设置 .shuffle 中的特定顺序。但是,洗牌只发生在缓冲区中,所以我不能 100% 确定设置种子是否会完美地重现顺序。另外,我不确定 reshuffle_each_iteration 将如何工作。每个时期后是否使用不同的种子?如果是这样,我想解决方法是一次只训练 1 个时期,每个时期都有一个指定的种子。

即使我确实获得了训练订单的副本,我也不确定如何找到模型最后保存在订单中的位置,然后从该点开始训练。我必须得到订单的一个想法是手动遍历数据集,直到我到达它。尽管我不确定 model.fit() 是否会从此订单继续,还是从头开始。 F

为了从上次保存模型的位置获取 step/batch 编号,我可能可以将其记录在某处。

这些解决方案似乎是粗略的解决方法,我想知道 Keras 中是否有我可能忽略的一些功能来帮助解决这个问题。

我想你想恢复洗牌顺序以避免在这个时期内重复某些样本。

根据 shuffle description 在未完成的时代,您的模型只能访问数据集中的前 current_step_number + shuffle_buffer_size 个样本。

因此,如果您知道处理了多少步,那么当您恢复训练时,您可以跳过这一步 + 跳过 shuffle_buffer_size 步,您将继续对以下样本进行训练,这些样本在内部尚未观察到当前纪元。

请注意,数据集第一部分的一些随机 shuffle_buffer_size 样本在这个时期根本不会被观察到。正如你所说你的时代很长,所以,可能你有很多数据,所以丢失 shuffle_buffer_size 个样本对你来说应该不是问题。

所以在保存检查点时也保存步数,然后在加载检查点后创建具有跳过步骤的数据集副本(使用 dataset.skip),然后使用 model.fit 这个较小的数据集一个时期(完成当前纪元),然后以通常的方式继续训练。

似乎没有 keras 构建方式可以做到这一点,但如果我错了,请纠正我。

我的方法

Dataset.shuffle 内部使用初始种子值生成种子,用于 reshuffle_each_iteration=True 迭代期间的重新洗牌。因此,为特定时期重新创建相同的顺序,并在该特定批次继续训练时期,我们必须使用相同的种子重新创建数据集,并将数据集迭代器移动到相同的时期和相同的批次。

调试

为了调试并确保 epochs 和 batches 以相同的顺序生成,我们需要一种方法来打印每个 epoch-batch 中的数据点是如何被拾取的。这在 kears 中很棘手,因此出于调试目的,我将使用回归问题并将基本事实作为序号。然后我可以有一个自定义损失,在那里我可以打印基本事实并让用户的顺序是正确的。

模型和数据

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import keras.backend as K


# Data
x_train = np.random.randn(15, 10).astype("float32")
y_train = np.arange(15).astype("float32")

# Custom MSE looss just to track the order in which data is picked up
def my_mse(y_true, y_pred):
    tf.print(tf.keras.backend.flatten(y_true))
    loss = K.square(y_pred - y_true)
    loss = K.sum(loss, axis=1)
    return loss

# Model
def get_model():
    inputs = keras.Input(shape=(10))    
    outputs = layers.Dense(1, activation="linear")(inputs)
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    model.compile(
        optimizer="rmsprop",
        loss=my_mse,
    )
    return model

数据集

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)

epochs = 2

print ("Runs 1")
for e in range(epochs):
  for i, (x, y) in enumerate(train_dataset):
    print (e, i, y)

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
print ("Runs 2")
for e in range(epochs):
  for i, (x, y) in enumerate(train_dataset):
    print (e, i, y)

输出:

Runs 1
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14.  9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10.  0. 14.  6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0.  1.  5.  6.  9.  3.  7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13.  8.  4. 10.  2. 12. 11.], shape=(7,), dtype=float32)
Runs 2
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14.  9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10.  0. 14.  6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0.  1.  5.  6.  9.  3.  7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13.  8.  4. 10.  2. 12. 11.], shape=(7,), dtype=float32)

是的,用种子复制了订单。

现在让我们写一个方法将数据集转发到某个时期和批次组合

def forward(dataset, n=None):
  if not n:
    return dataset

  i = 0  
  while True:
    for _ in dataset:        
        i += 1
        if i == n:
          return dataset

测试用例:

让运行正常,遵守顺序

开头的数据

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)

model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)

输出:

[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]

来自Dataset第n个状态的数据

将我们的数据集转发到第 4 次迭代和 运行 训练

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)

model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)

输出:

[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]

很好,现在我们知道如何正确转发数据集了。现在让我们编写回调来跟踪当前迭代次数:

跟踪迭代的自定义回调(epoch-batch 组合)

现在我们需要确定模型检查点所在的时代和批次组合。如果我们有这些信息,我们可以加载最后一个检查点模型并将我们的数据集转发到它的批次和时期组合并继续训练。我们将使用回调

来做到这一点
class MyCustomCallback(tf.keras.callbacks.ModelCheckpoint, keras.callbacks.Callback):
    def __init__(self, the_id=0, **args):
      self.the_id = the_id
      self.epoch = 0
      super().__init__(**args)

    def _save_model(self, epoch, logs):
      logs['the_id'] = self.the_id
      super()._save_model(epoch, logs)

    def on_batch_end(self, batch, logs={}):
      self.the_id += 1
      super().on_batch_end(batch, logs)

checkpoint_filepath = 'checkpoint-{the_id}'
model_checkpoint_callback = MyCustomCallback(
    filepath=checkpoint_filepath,
    save_freq=2,
    save_best_only=False)

model = get_model()

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)

model.fit(train_dataset, epochs=5, verbose=0, callbacks=[model_checkpoint_callback], workers=4, shuffle=False)

输出:

[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]

我们每两批检查一次。所以让我们假设它崩溃了,最后一个检查点是 checkpoint-4。我们可以加载这个模型并将我们的数据集转发到 4 并继续训练。

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)

model = get_model()
model.fit(train_dataset, epochs=2, verbose=0, workers=4, shuffle=False)

输出:

[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]