基于 actor-critic 实现时间差异的问题

Problem with implementing temporal difference based on actor-critic

我在Tensorflow==2.3.1中实现了一个简单的actor-critic模型来学习Cartpole环境。但这根本不是学习。每 50 集的平均分数低于 20。有人可以指出为什么模型没有学习吗?

我的算法基于以下伪代码:

for every episode do
    S <- starting state
    for every step do
        choose action A based on actor_policy P
        take an action A
        observe the reward R and new state S_new
        calculate error E based on TD(0) schema V:
        if new state S_new is not terminal then
            E <- R + discount_factor * V(S_new) - V(S)
        else
            E <- R - V(S))
        end if
        calculate the loss for the critic: Lc <- E^2
        calculate the loss for the actor:  La <- -E * ln(P(A, S))
        calculate overall loss: L <- Lc + La
        update the weights with the gradient: w <- w + alpha * grad(w) * L
    end for
end for

Discount factoralpha 是常量。以下是要求:

box2d==2.3.10
gym==0.17.3
keras==2.4.3
matplotlib==3.3.3
numpy==1.19.4
scikit-learn==0.23.2
tensorflow==2.3.1
tensorflow-probability==0.11.1
tqdm==4.53.0

最后,我的代码是:

from typing import Optional
import math

import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm

CART_POLE_ACTIONS = [0, 1]
CART_POLE_OUTPUT_LEN = len(CART_POLE_ACTIONS)
CART_POLE_INPUTS = 4


class ActorCriticController:
    def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
                 output_size: int, h1_size: int, h2_size: int, actions: list):
        self.environment = environment
        self.discount_factor: float = discount_factor
        self.input_size = input_size
        self.output_size = output_size
        self.h1_size = h1_size
        self.h2_size = h2_size
        self.actions = actions

        self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(
            learning_rate=learning_rate)
        self.last_error_squared: float = 0.0
        self.model: tf.keras.Model = self.create_actor_critic_model()
        self.log_action_probability: Optional[tf.Tensor] = None
        self.tape: Optional[tf.GradientTape] = None

    def create_actor_critic_model(self) -> tf.keras.Model:
        inputs = tf.keras.Input(shape=(self.input_size,))
        hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
        hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)
        outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
        outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)

        model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])

        return model

    def choose_action(self, state: np.ndarray) -> int:
        state = self.format_state(state)

        self.tape = tf.GradientTape()
        with self.tape:
            probs, _ = self.model(state)
            action = tfp.distributions.Categorical(probs=probs).sample(1)
            index = self.actions.index(int(action))
            self.log_action_probability = math.log(probs[0][index], math.e)
        return int(action)

    def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
        state = self.format_state(state)
        new_state = self.format_state(new_state)

        with self.tape:
            _, critic_value = self.model(state)
            _, new_critic_value = self.model(new_state)

            error = reward - critic_value
            if not terminal:
                error += self.discount_factor * new_critic_value

            self.last_error_squared = float(error) ** 2
            loss = self.last_error_squared - error * self.log_action_probability

        gradients = self.tape.gradient(loss, self.model.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))

    @staticmethod
    def format_state(state: np.ndarray) -> np.ndarray:
        return np.reshape(state, (1, state.size))


def main() -> None:
    environment = gym.make('CartPole-v1')
    controller = ActorCriticController(environment=environment,
                                       learning_rate=0.00001,
                                       discount_factor=0.99,
                                       input_size=CART_POLE_INPUTS,  # 4
                                       output_size=CART_POLE_OUTPUT_LEN,  # 2
                                       h1_size=128, # 1024, 128
                                       h2_size=32,  # 256, 32
                                       actions = CART_POLE_ACTIONS)

    past_rewards = []
    past_errors = []
    for i_episode in tqdm(range(2000)):
        done = False
        state = environment.reset()
        reward_sum = 0.0
        errors_history = []

        while not done:
            environment.render()

            action = controller.choose_action(state)
            new_state, reward, done, info = environment.step(action)
            controller.learn(state, reward, new_state, done)
            state = new_state
            reward_sum += reward
            errors_history.append(controller.last_error_squared)
        print(f"reward_sum = {reward_sum}\n\n\n")
        past_rewards.append(reward_sum)
        past_errors.append(np.mean(errors_history))

        window_size = 50
        if i_episode % 25 == 0:
            if len(past_rewards) >= window_size:
                fig, axs = plt.subplots(2)

                axs[0].plot(
                    [np.mean(past_errors[i:i + window_size]) for i in range(len(past_errors) - window_size)],
                    'tab:red',
                )
                axs[0].set_title('mean squared error')

                axs[1].plot(
                    [np.mean(past_rewards[i:i+window_size]) for i in range(len(past_rewards) - window_size)],
                    'tab:green',
                )
                axs[1].set_title('sum of rewards')
            plt.savefig(f'learning_{i_episode}.png')
            plt.clf()

    environment.close()
    controller.model.save("final.model")


if __name__ == '__main__':
    main()

提前感谢您的帮助。我希望有人会告诉我:)

我能够修复您的代码。主要变化:

  • math.log()替换为tfp.distributions.Categorical.log_prob()
  • 更改误差计算方法

但我不完全确定为什么它会这样工作,因此希望进一步说明。

from typing import Optional

import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm


class ActorCriticController:
    def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
                 output_size: int, h1_size: int, h2_size: int) -> None:
        self.environment = environment
        self.discount_factor: float = discount_factor
        self.input_size = input_size
        self.output_size = output_size
        self.h1_size = h1_size
        self.h2_size = h2_size

        self.model: tf.keras.Model = self.create_actor_critic_model()
        self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(lr=learning_rate)
        self.log_action_probability: Optional[tf.Tensor] = None
        self.tape: Optional[tf.GradientTape] = None
        self.last_error_squared: float = 0.0

    def create_actor_critic_model(self) -> tf.keras.Model:
        inputs = tf.keras.Input(shape=(self.input_size,))
        hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
        hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)

        outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
        outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)

        model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])

        return model

    def choose_action(self, state: np.ndarray) -> int:
        state = self.format_state(state)

        self.tape = tf.GradientTape()
        with self.tape:
            probs, _ = self.model(state)
            distribution = tfp.distributions.Categorical(probs=probs)
            action = distribution.sample()
            self.log_action_probability = distribution.log_prob(action)
        return int(action)

    # noinspection PyTypeChecker
    def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
        state = self.format_state(state)
        new_state = self.format_state(new_state)

        with self.tape:
            if not terminal:
                error = reward - self.model(state)[1] + self.discount_factor * \
                        self.model(new_state)[1]
            else:
                error = reward - self.model(state)[1]

            self.last_error_squared = float(error) ** 2

            actor_loss = - error * self.log_action_probability
            critic_loss = error ** 2

            loss = actor_loss + critic_loss

        gradients = self.tape.gradient(loss, self.model.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))

    @staticmethod
    def format_state(state: np.ndarray) -> np.ndarray:
        return np.reshape(state, (1, state.size))


def main() -> None:
    environment = gym.make('CartPole-v1')
    controller = ActorCriticController(environment=environment,
                                       learning_rate=0.00006,
                                       discount_factor=0.99,
                                       input_size=4,
                                       output_size=2,
                                       h1_size=1024,  # 1024, 128
                                       h2_size=256)  # 256, 32

    past_rewards = []
    past_errors = []
    for i_episode in tqdm(range(2000)):
        done = False
        state = environment.reset()
        reward_sum = 0.0
        errors_history = []

        while not done:
            environment.render()

            action = controller.choose_action(state)
            new_state, reward, done, info = environment.step(action)
            controller.learn(state, reward, new_state, done)
            state = new_state
            reward_sum += reward
            errors_history.append(controller.last_error_squared)
        print(f"reward_sum = {reward_sum}\n\n\n")
        past_rewards.append(reward_sum)
        past_errors.append(np.mean(errors_history))

        if i_episode % 150 == 0:
            controller.model.save(f"model.{i_episode}")

        window_size = 30
        if i_episode % 100 == 0:
            if len(past_rewards) >= window_size:
                fig, axs = plt.subplots(2)

                axs[0].plot(
                    [np.mean(past_errors[i:i + window_size]) for i in
                     range(len(past_errors) - window_size)],
                    'tab:red',
                )
                axs[0].set_title('mean squared error')

                axs[1].plot(
                    [np.mean(past_rewards[i:i + window_size]) for i in
                     range(len(past_rewards) - window_size)],
                    'tab:green',
                )
                axs[1].set_title('sum of rewards')
            plt.savefig(f'plots/learning_{i_episode}.png')
            plt.clf()

    environment.close()
    controller.model.save(r"final.model")


if __name__ == '__main__':
    main()