基于批次而不是时期的指数衰减学习率

Exponential decay learning rate based on batches instead of epochs

与大多数调度程序所基于的不同,我想要基于时间步长而不是纪元的自适应学习率。我有一个模型:

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam

class DQNagent:

    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.model = self.build_model()    # original model
        self.target_model = self.build_model()  # target model
        self.lr = 1e-2

    def build_model(self):
        
        x_in = layers.Input(shape=(self.step_size, self.state_size))
        x_out = layers.Dense(20, activation='relu')(x_in)
        output = layers.Dense(self.action_size, activation='linear')(x_out)
        
        self.learning_rate = CustomSchedule()
 
        opt = tf.keras.optimizers.Adam(self.learning_rate)
        model = Model(inputs=x_in, outputs=output_y, name="DQN")
        model.compile(loss=['mse'], optimizer=opt)
    

        return model

我想制作一个像这样的调度程序:

class CustomSchedule:
  def __init__(self, lr=1e-2):
    super(CustomSchedule, self).__init__()
    self.lr = lr
    self.t = 0

  def __call__(self):
    self.t +=1
    if self.t % 100 ==0:
        self.lr /= 10

    return self.lr

我没有声明所有内容的主要代码是这样的:

dqn = DQNagent(state_size, action_size)

for step in range(1000):
    states_all = np.array([[[0, 0, 1],[1,0,1], [0, -1, 1], [1,-1,1]]])
    Q_values =  dqn.model.predict(state_all)[0]
    
    # training
    batch = memory.sample(batch_size)
    batch_states = utils.get_states_user(batch) # assuming I have generated states using this method
    
    Q_states = dqn.model.predict(batch_states) # assuming I have sampled batch states
    
    dqn.model.fit(batch_states, Q_states, verbose =0)

我想安排我的学习率,如果我说 step%100==0 学习率降低为 learning_rate/10。似乎对于我创建的 CustomSchedule class,我将不得不重新编译 model,这似乎无法有效地保存和加载权重。我还有其他方法可以做到这一点吗?

编辑:

我已经按照@FedericoMalerba 的方式编辑了我的代码

创建了一个 decay_func 作为:

def decay_func(step, lr):

    return lr/10**(step/100)

然后我对 DQNAgent class 添加了以下更改:

class DQNAgent():
     def __init__(self, state_size, action_size):
     self.lr = 1e-2
     self.t_step = tf.Variable(0, trainable=False, name='Step', dtype=tf.int64)
     self.decaying_lr = partial(decay_func, step=self.step, lr=self.lr)
     
    def __call__(self):
        self.step.assign_add(1)
        return self.step

并在我的主代码中为每个步骤调用 dqn()。可调用 decaying_lr 被传递给 build_model() 中的优化器作为 opt = tf.keras.optimizers.Adam(self.decaying_lr)

您要找的是 Exponential Decay。您可以将其用作学习率:

initial_learning_rate = 0.1
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=50,
    decay_rate=0.1,
    staircase=True)

每 50 步,从 0.1 的学习率开始,学习率将除以 10。

这是一个有效的训练脚本,我打印了生成的学习率。我做了一个自定义回调,所以它在每批结束时打印学习率。

import tensorflow as tf
from tensorflow import keras
import numpy as np

(xtrain, ytrain), _ = keras.datasets.mnist.load_data()

xtrain = np.float32(xtrain/255)
ytrain = np.int32(ytrain)

def pre_process(inputs, targets):
    inputs = tf.expand_dims(inputs, -1)
    targets = tf.one_hot(targets, depth=10)
    return tf.divide(inputs, 255), targets

train_ds = tf.data.Dataset.from_tensor_slices((xtrain, ytrain)).\
    take(10_000).shuffle(100).batch(8).map(pre_process)

from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, Dropout, Flatten

model = tf.keras.Sequential([
    Conv2D(filters=16, kernel_size=(3, 3),
                            strides=(1, 1), input_shape=(28, 28, 1)),
    MaxPool2D(pool_size=(2, 2)),
    Conv2D(filters=32, kernel_size=(3, 3),
                            strides=(1, 1)),
    MaxPool2D(pool_size=(2, 2)),
    Flatten(),
    Dense(64, activation='relu'),
    Dropout(5e-1),
    Dense(10)])


class PrintLRCallback(tf.keras.callbacks.Callback):
    def on_batch_end(self, batch, logs=None):
        print(model.optimizer.iterations.numpy(),
              model.optimizer._decayed_lr(tf.float32).numpy())

initial_learning_rate = 0.1
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=50,
    decay_rate=0.1,
    staircase=True)

model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule))

history = model.fit(train_ds, verbose=0, 
                    callbacks=[PrintLRCallback()],
                    steps_per_epoch=250)

每 10 次迭代,我打印迭代次数和学习率:

10 0.1
20 0.1
30 0.1
40 0.1
50 0.01 
60 0.01 
70 0.01 
80 0.01 
90 0.01 
100 0.001
110 0.001
120 0.001
130 0.001
140 0.001
150 0.0001 
160 0.0001 
170 0.0001 
180 0.0001 
190 0.0001 

解决此问题的一般方法是创建一个不带参数的可调用对象(函数),并将其传递给您在 DQNagent.build_model() 中定义的 Adam 优化器。为此,请按照下列步骤操作:

  1. 创建您的 epsilon 衰减函数:
def decay_func(step_tensor, **other_arguments_your_func_needs):
    # body of the function. The input argument step tensor must be used
    # to determine the learning rate that will be returned
    return learning_rate
  1. 创建你的步进张量(必须是张量!!!):
step = tf.Variable(0, trainable=False, name='Step', dtype=tf.int64)
  1. 创建要传递给优化器的可调用对象:
from functools import partial

decaying_learning_rate = partial(decay_func, step_tensor=step, **other_arguments_your_func_needs)
  1. 在创建优化器时传递可调用对象:
opt = tf.keras.optimizers.Adam(decaying_learning_rate)
  1. 在你的训练循环中通过增加你的步长张量来迭代:
step.assing_add(1)

您实际上是在创建一个不带任何参数的可调用 decaying_learning_rate,因为所有参数都是由 functools.partial 调用提供给它的。 TensorFlow 优化器会意识到学习率不是一个数字,而是一个可调用对象,并且会这样调用它:

this_step_learning_rate = decaying_learning_rate()

由于张量是整个运行时的共享对象,因此当您使用 step.assing_add(1) 增加计步器时,这个新步骤将用于在下次调用时计算 decay_func 中的新学习率优化器所做的。即使您没有明确传递新的和更新的张量,也会发生这种情况。魔法!

顺便说一句,这正是 Exponential Decay 所做的。我在这里介绍的唯一一件事是一种通用的方法来定义您自己的 decay_func,以及如何让它像 TF 预先实现的指数衰减一样工作。