基于 actor-critic 实现时间差异的问题
Problem with implementing temporal difference based on actor-critic
我在Tensorflow==2.3.1中实现了一个简单的actor-critic模型来学习Cartpole环境。但这根本不是学习。每 50 集的平均分数低于 20。有人可以指出为什么模型没有学习吗?
我的算法基于以下伪代码:
for every episode do
S <- starting state
for every step do
choose action A based on actor_policy P
take an action A
observe the reward R and new state S_new
calculate error E based on TD(0) schema V:
if new state S_new is not terminal then
E <- R + discount_factor * V(S_new) - V(S)
else
E <- R - V(S))
end if
calculate the loss for the critic: Lc <- E^2
calculate the loss for the actor: La <- -E * ln(P(A, S))
calculate overall loss: L <- Lc + La
update the weights with the gradient: w <- w + alpha * grad(w) * L
end for
end for
Discount factor
和 alpha
是常量。以下是要求:
box2d==2.3.10
gym==0.17.3
keras==2.4.3
matplotlib==3.3.3
numpy==1.19.4
scikit-learn==0.23.2
tensorflow==2.3.1
tensorflow-probability==0.11.1
tqdm==4.53.0
最后,我的代码是:
from typing import Optional
import math
import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm
CART_POLE_ACTIONS = [0, 1]
CART_POLE_OUTPUT_LEN = len(CART_POLE_ACTIONS)
CART_POLE_INPUTS = 4
class ActorCriticController:
def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
output_size: int, h1_size: int, h2_size: int, actions: list):
self.environment = environment
self.discount_factor: float = discount_factor
self.input_size = input_size
self.output_size = output_size
self.h1_size = h1_size
self.h2_size = h2_size
self.actions = actions
self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(
learning_rate=learning_rate)
self.last_error_squared: float = 0.0
self.model: tf.keras.Model = self.create_actor_critic_model()
self.log_action_probability: Optional[tf.Tensor] = None
self.tape: Optional[tf.GradientTape] = None
def create_actor_critic_model(self) -> tf.keras.Model:
inputs = tf.keras.Input(shape=(self.input_size,))
hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)
outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)
model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])
return model
def choose_action(self, state: np.ndarray) -> int:
state = self.format_state(state)
self.tape = tf.GradientTape()
with self.tape:
probs, _ = self.model(state)
action = tfp.distributions.Categorical(probs=probs).sample(1)
index = self.actions.index(int(action))
self.log_action_probability = math.log(probs[0][index], math.e)
return int(action)
def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
state = self.format_state(state)
new_state = self.format_state(new_state)
with self.tape:
_, critic_value = self.model(state)
_, new_critic_value = self.model(new_state)
error = reward - critic_value
if not terminal:
error += self.discount_factor * new_critic_value
self.last_error_squared = float(error) ** 2
loss = self.last_error_squared - error * self.log_action_probability
gradients = self.tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))
@staticmethod
def format_state(state: np.ndarray) -> np.ndarray:
return np.reshape(state, (1, state.size))
def main() -> None:
environment = gym.make('CartPole-v1')
controller = ActorCriticController(environment=environment,
learning_rate=0.00001,
discount_factor=0.99,
input_size=CART_POLE_INPUTS, # 4
output_size=CART_POLE_OUTPUT_LEN, # 2
h1_size=128, # 1024, 128
h2_size=32, # 256, 32
actions = CART_POLE_ACTIONS)
past_rewards = []
past_errors = []
for i_episode in tqdm(range(2000)):
done = False
state = environment.reset()
reward_sum = 0.0
errors_history = []
while not done:
environment.render()
action = controller.choose_action(state)
new_state, reward, done, info = environment.step(action)
controller.learn(state, reward, new_state, done)
state = new_state
reward_sum += reward
errors_history.append(controller.last_error_squared)
print(f"reward_sum = {reward_sum}\n\n\n")
past_rewards.append(reward_sum)
past_errors.append(np.mean(errors_history))
window_size = 50
if i_episode % 25 == 0:
if len(past_rewards) >= window_size:
fig, axs = plt.subplots(2)
axs[0].plot(
[np.mean(past_errors[i:i + window_size]) for i in range(len(past_errors) - window_size)],
'tab:red',
)
axs[0].set_title('mean squared error')
axs[1].plot(
[np.mean(past_rewards[i:i+window_size]) for i in range(len(past_rewards) - window_size)],
'tab:green',
)
axs[1].set_title('sum of rewards')
plt.savefig(f'learning_{i_episode}.png')
plt.clf()
environment.close()
controller.model.save("final.model")
if __name__ == '__main__':
main()
提前感谢您的帮助。我希望有人会告诉我:)
我能够修复您的代码。主要变化:
- 将
math.log()
替换为tfp.distributions.Categorical.log_prob()
- 更改误差计算方法
但我不完全确定为什么它会这样工作,因此希望进一步说明。
from typing import Optional
import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm
class ActorCriticController:
def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
output_size: int, h1_size: int, h2_size: int) -> None:
self.environment = environment
self.discount_factor: float = discount_factor
self.input_size = input_size
self.output_size = output_size
self.h1_size = h1_size
self.h2_size = h2_size
self.model: tf.keras.Model = self.create_actor_critic_model()
self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(lr=learning_rate)
self.log_action_probability: Optional[tf.Tensor] = None
self.tape: Optional[tf.GradientTape] = None
self.last_error_squared: float = 0.0
def create_actor_critic_model(self) -> tf.keras.Model:
inputs = tf.keras.Input(shape=(self.input_size,))
hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)
outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)
model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])
return model
def choose_action(self, state: np.ndarray) -> int:
state = self.format_state(state)
self.tape = tf.GradientTape()
with self.tape:
probs, _ = self.model(state)
distribution = tfp.distributions.Categorical(probs=probs)
action = distribution.sample()
self.log_action_probability = distribution.log_prob(action)
return int(action)
# noinspection PyTypeChecker
def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
state = self.format_state(state)
new_state = self.format_state(new_state)
with self.tape:
if not terminal:
error = reward - self.model(state)[1] + self.discount_factor * \
self.model(new_state)[1]
else:
error = reward - self.model(state)[1]
self.last_error_squared = float(error) ** 2
actor_loss = - error * self.log_action_probability
critic_loss = error ** 2
loss = actor_loss + critic_loss
gradients = self.tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))
@staticmethod
def format_state(state: np.ndarray) -> np.ndarray:
return np.reshape(state, (1, state.size))
def main() -> None:
environment = gym.make('CartPole-v1')
controller = ActorCriticController(environment=environment,
learning_rate=0.00006,
discount_factor=0.99,
input_size=4,
output_size=2,
h1_size=1024, # 1024, 128
h2_size=256) # 256, 32
past_rewards = []
past_errors = []
for i_episode in tqdm(range(2000)):
done = False
state = environment.reset()
reward_sum = 0.0
errors_history = []
while not done:
environment.render()
action = controller.choose_action(state)
new_state, reward, done, info = environment.step(action)
controller.learn(state, reward, new_state, done)
state = new_state
reward_sum += reward
errors_history.append(controller.last_error_squared)
print(f"reward_sum = {reward_sum}\n\n\n")
past_rewards.append(reward_sum)
past_errors.append(np.mean(errors_history))
if i_episode % 150 == 0:
controller.model.save(f"model.{i_episode}")
window_size = 30
if i_episode % 100 == 0:
if len(past_rewards) >= window_size:
fig, axs = plt.subplots(2)
axs[0].plot(
[np.mean(past_errors[i:i + window_size]) for i in
range(len(past_errors) - window_size)],
'tab:red',
)
axs[0].set_title('mean squared error')
axs[1].plot(
[np.mean(past_rewards[i:i + window_size]) for i in
range(len(past_rewards) - window_size)],
'tab:green',
)
axs[1].set_title('sum of rewards')
plt.savefig(f'plots/learning_{i_episode}.png')
plt.clf()
environment.close()
controller.model.save(r"final.model")
if __name__ == '__main__':
main()
我在Tensorflow==2.3.1中实现了一个简单的actor-critic模型来学习Cartpole环境。但这根本不是学习。每 50 集的平均分数低于 20。有人可以指出为什么模型没有学习吗?
我的算法基于以下伪代码:
for every episode do
S <- starting state
for every step do
choose action A based on actor_policy P
take an action A
observe the reward R and new state S_new
calculate error E based on TD(0) schema V:
if new state S_new is not terminal then
E <- R + discount_factor * V(S_new) - V(S)
else
E <- R - V(S))
end if
calculate the loss for the critic: Lc <- E^2
calculate the loss for the actor: La <- -E * ln(P(A, S))
calculate overall loss: L <- Lc + La
update the weights with the gradient: w <- w + alpha * grad(w) * L
end for
end for
Discount factor
和 alpha
是常量。以下是要求:
box2d==2.3.10
gym==0.17.3
keras==2.4.3
matplotlib==3.3.3
numpy==1.19.4
scikit-learn==0.23.2
tensorflow==2.3.1
tensorflow-probability==0.11.1
tqdm==4.53.0
最后,我的代码是:
from typing import Optional
import math
import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm
CART_POLE_ACTIONS = [0, 1]
CART_POLE_OUTPUT_LEN = len(CART_POLE_ACTIONS)
CART_POLE_INPUTS = 4
class ActorCriticController:
def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
output_size: int, h1_size: int, h2_size: int, actions: list):
self.environment = environment
self.discount_factor: float = discount_factor
self.input_size = input_size
self.output_size = output_size
self.h1_size = h1_size
self.h2_size = h2_size
self.actions = actions
self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(
learning_rate=learning_rate)
self.last_error_squared: float = 0.0
self.model: tf.keras.Model = self.create_actor_critic_model()
self.log_action_probability: Optional[tf.Tensor] = None
self.tape: Optional[tf.GradientTape] = None
def create_actor_critic_model(self) -> tf.keras.Model:
inputs = tf.keras.Input(shape=(self.input_size,))
hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)
outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)
model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])
return model
def choose_action(self, state: np.ndarray) -> int:
state = self.format_state(state)
self.tape = tf.GradientTape()
with self.tape:
probs, _ = self.model(state)
action = tfp.distributions.Categorical(probs=probs).sample(1)
index = self.actions.index(int(action))
self.log_action_probability = math.log(probs[0][index], math.e)
return int(action)
def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
state = self.format_state(state)
new_state = self.format_state(new_state)
with self.tape:
_, critic_value = self.model(state)
_, new_critic_value = self.model(new_state)
error = reward - critic_value
if not terminal:
error += self.discount_factor * new_critic_value
self.last_error_squared = float(error) ** 2
loss = self.last_error_squared - error * self.log_action_probability
gradients = self.tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))
@staticmethod
def format_state(state: np.ndarray) -> np.ndarray:
return np.reshape(state, (1, state.size))
def main() -> None:
environment = gym.make('CartPole-v1')
controller = ActorCriticController(environment=environment,
learning_rate=0.00001,
discount_factor=0.99,
input_size=CART_POLE_INPUTS, # 4
output_size=CART_POLE_OUTPUT_LEN, # 2
h1_size=128, # 1024, 128
h2_size=32, # 256, 32
actions = CART_POLE_ACTIONS)
past_rewards = []
past_errors = []
for i_episode in tqdm(range(2000)):
done = False
state = environment.reset()
reward_sum = 0.0
errors_history = []
while not done:
environment.render()
action = controller.choose_action(state)
new_state, reward, done, info = environment.step(action)
controller.learn(state, reward, new_state, done)
state = new_state
reward_sum += reward
errors_history.append(controller.last_error_squared)
print(f"reward_sum = {reward_sum}\n\n\n")
past_rewards.append(reward_sum)
past_errors.append(np.mean(errors_history))
window_size = 50
if i_episode % 25 == 0:
if len(past_rewards) >= window_size:
fig, axs = plt.subplots(2)
axs[0].plot(
[np.mean(past_errors[i:i + window_size]) for i in range(len(past_errors) - window_size)],
'tab:red',
)
axs[0].set_title('mean squared error')
axs[1].plot(
[np.mean(past_rewards[i:i+window_size]) for i in range(len(past_rewards) - window_size)],
'tab:green',
)
axs[1].set_title('sum of rewards')
plt.savefig(f'learning_{i_episode}.png')
plt.clf()
environment.close()
controller.model.save("final.model")
if __name__ == '__main__':
main()
提前感谢您的帮助。我希望有人会告诉我:)
我能够修复您的代码。主要变化:
- 将
math.log()
替换为tfp.distributions.Categorical.log_prob()
- 更改误差计算方法
但我不完全确定为什么它会这样工作,因此希望进一步说明。
from typing import Optional
import gym
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tqdm import tqdm
class ActorCriticController:
def __init__(self, environment, learning_rate: float, discount_factor: float, input_size: int,
output_size: int, h1_size: int, h2_size: int) -> None:
self.environment = environment
self.discount_factor: float = discount_factor
self.input_size = input_size
self.output_size = output_size
self.h1_size = h1_size
self.h2_size = h2_size
self.model: tf.keras.Model = self.create_actor_critic_model()
self.optimizer: tf.keras.optimizers.Adam = tf.keras.optimizers.Adam(lr=learning_rate)
self.log_action_probability: Optional[tf.Tensor] = None
self.tape: Optional[tf.GradientTape] = None
self.last_error_squared: float = 0.0
def create_actor_critic_model(self) -> tf.keras.Model:
inputs = tf.keras.Input(shape=(self.input_size,))
hidden1 = tf.keras.layers.Dense(self.h1_size, activation='relu')(inputs)
hidden2 = tf.keras.layers.Dense(self.h2_size, activation='relu')(hidden1)
outputs_actor = tf.keras.layers.Dense(self.output_size, activation='softmax')(hidden2)
outputs_critic = tf.keras.layers.Dense(1, activation='linear')(hidden2)
model = tf.keras.Model(inputs=inputs, outputs=[outputs_actor, outputs_critic])
return model
def choose_action(self, state: np.ndarray) -> int:
state = self.format_state(state)
self.tape = tf.GradientTape()
with self.tape:
probs, _ = self.model(state)
distribution = tfp.distributions.Categorical(probs=probs)
action = distribution.sample()
self.log_action_probability = distribution.log_prob(action)
return int(action)
# noinspection PyTypeChecker
def learn(self, state: np.ndarray, reward: float, new_state: np.ndarray, terminal: bool):
state = self.format_state(state)
new_state = self.format_state(new_state)
with self.tape:
if not terminal:
error = reward - self.model(state)[1] + self.discount_factor * \
self.model(new_state)[1]
else:
error = reward - self.model(state)[1]
self.last_error_squared = float(error) ** 2
actor_loss = - error * self.log_action_probability
critic_loss = error ** 2
loss = actor_loss + critic_loss
gradients = self.tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))
@staticmethod
def format_state(state: np.ndarray) -> np.ndarray:
return np.reshape(state, (1, state.size))
def main() -> None:
environment = gym.make('CartPole-v1')
controller = ActorCriticController(environment=environment,
learning_rate=0.00006,
discount_factor=0.99,
input_size=4,
output_size=2,
h1_size=1024, # 1024, 128
h2_size=256) # 256, 32
past_rewards = []
past_errors = []
for i_episode in tqdm(range(2000)):
done = False
state = environment.reset()
reward_sum = 0.0
errors_history = []
while not done:
environment.render()
action = controller.choose_action(state)
new_state, reward, done, info = environment.step(action)
controller.learn(state, reward, new_state, done)
state = new_state
reward_sum += reward
errors_history.append(controller.last_error_squared)
print(f"reward_sum = {reward_sum}\n\n\n")
past_rewards.append(reward_sum)
past_errors.append(np.mean(errors_history))
if i_episode % 150 == 0:
controller.model.save(f"model.{i_episode}")
window_size = 30
if i_episode % 100 == 0:
if len(past_rewards) >= window_size:
fig, axs = plt.subplots(2)
axs[0].plot(
[np.mean(past_errors[i:i + window_size]) for i in
range(len(past_errors) - window_size)],
'tab:red',
)
axs[0].set_title('mean squared error')
axs[1].plot(
[np.mean(past_rewards[i:i + window_size]) for i in
range(len(past_rewards) - window_size)],
'tab:green',
)
axs[1].set_title('sum of rewards')
plt.savefig(f'plots/learning_{i_episode}.png')
plt.clf()
environment.close()
controller.model.save(r"final.model")
if __name__ == '__main__':
main()