model.predict() == ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (1, 2)
model.predict() == ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (1, 2)
这个项目背后的想法是最终删除“测试”变量并利用来自传感器的真实数据。测试环境可以,但现在我希望能够使用真实数据。
两个数据点,从 1 到 100 的整数,用作输入:土壤湿度和下雨的可能性。
底线:
我只想输入两个数字并获得模型对要采取的行动的最佳预测(如果可能,还有置信度百分比)。
虽然我在尝试进行预测时遇到错误。
我尝试过的事情:
pred = dqn.model.predict(np.array([30, 30]))
ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (2, 1)
pred = dqn.model.predict(np.expand_dims(np.array([30, 30]), axis=0))
ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (1, 2)
我看到其他一些帖子提到重塑,但我对这个项目有点精疲力竭,不确定这是否是解决方案。
为了更快地进行测试,我降低了一些变量,但是在这里。这是我当前的代码:
import os
import random
from abc import ABC
import numpy as np
from gym import Env
from gym.spaces import Discrete, Box
from rl.agents import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import BoltzmannQPolicy
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Step per episode
steps = 10000
# Number of episodes
episodes = 100
# Score requirement per episode
# Used for stats and to filter training data
score_requirement = 1000
# Creates a Model that emulates a Markov Decision Process
# Finite process -> steps
# Rewards for watering well and punishes for watering bad
# Action -> Observation -> Reward
class PlantEnv(Env, ABC):
def __init__(self):
# Actions = water: 0=(none), 1=(3 seconds), 2=(4 seconds), 3=(5 seconds), 4=(6 seconds)
self.action_space = Discrete(5)
# Starting Moisture
moisture = 20 + random.randint(-10, 10)
# Starting Chance of Rain
chance_of_rain = 50 + random.randint(-50, 50)
# Observations
self.observation_space = Box(low=np.array([0, 0]), high=np.array([100, 100]), dtype=np.int)
self.state = moisture, chance_of_rain
# Number of water steps left
self.water_length = steps
def step(self, action):
# Action section
water = 0
if action == 1:
water = 2
elif action == 2:
water = 3
elif action == 3:
water = 4
elif action == 4:
water = 5
# Retrieve previous state
moisture, chance_of_rain = self.state
# The lower/higher this is, greatly affects the scoring
# 5 or 6 is the best with this setup
moisture += (water * 5)
self.water_length -= 1
# Reward Section
reward = 0
if 40 <= moisture <= 60:
reward = 2
# If moisture is dry or wet
elif 60 < moisture <= 80 or 20 <= moisture < 40:
reward = 1
# If moisture is really dry or really wet
elif 80 < moisture <= 100 or 0 <= moisture < 20:
reward = -1
# If moisture is really dry or really wet
elif 100 < moisture or moisture < 0:
reward = -2
# Check if shower is done
if self.water_length <= 0:
done = True
else:
done = False
# Apply noise to test program
# Simulate real-life conditions: evaporation, water loss, rain
# Not used in final program
moistureLoss = random.randint(15, 25)
moisture -= moistureLoss
# Simulate chance of rain
chance_of_rain = 50 + random.randint(-50, 50)
xfactor = chance_of_rain + random.randint(-50, 50)
if xfactor > 100:
moisture += (10 + random.randint(0, 15))
# Set placeholder for info
info = {}
# Save current state
self.state = moisture, chance_of_rain
# Return step information
return self.state, reward, done, info
def reset(self):
# Reset test environment
# Set starting moisture
moisture = 50 + random.randint(-10, 10)
# Set starting chance of rain array
chance_of_rain = 50 + random.randint(-50, 50)
self.state = moisture, chance_of_rain
# Reset Test time
self.water_length = steps
return self.state
# # Builds a model using previously defined states and actions
# def build_model():
# inputs = Input(shape=(1, 2), name="input")
# inputsF = Flatten()(inputs)
# common = Dense(24, activation="relu", name="state")(inputsF)
# action = Dense(5, activation="softmax", name="action")(common)
# critic = Dense(1, name="output")(common)
# model = keras.Model(inputs=inputs, outputs=[action, critic])
# return model
# Build Model
def build_model():
model = Sequential()
model.add(Flatten(input_shape=(1, 2)))
model.add(Dense(24, activation='relu'))
model.add(Dense(48, activation='relu'))
model.add(Dense(5, activation='linear'))
return model
# Build Agent
def build_agent(model):
policy = BoltzmannQPolicy()
memory = SequentialMemory(limit=1000, window_length=1)
dqn = DQNAgent(model=model, memory=memory, policy=policy, nb_actions=5,
nb_steps_warmup=50, target_model_update=1e-3)
return dqn
# Build Deep-Q-Network
def build_dqn(dqn):
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae', 'accuracy'])
dqn.fit(env, nb_steps=2000, visualize=False, verbose=1)
return dqn
# Create environment
env = PlantEnv()
# Store data to show scoring stats and to use for training.
accepted_scores = []
training_data = []
scores = []
good_episodes = 0
# Create episodes and initiate simulation
for episode in range(1, episodes + 1):
observation = env.reset()
done = False
score = 0
history = []
prev_observation = []
# Print starting moisture to compare to ending moisture
# print("Start Moisture: {}%".format(observation[0]))
while not done:
action = env.action_space.sample()
# Force action override for plant protection
if observation[0] > 100:
action = 0
elif observation[0] < 0:
action = 4
observation, reward, done, info = env.step(action)
score += reward
if len(prev_observation) > 0:
history.append([prev_observation, action])
prev_observation = observation
# # Print ending moisture to compare to starting moisture
# # Then print Episode number and score
# print("End Moisture : {}%".format(observation[0]))
# print('Episode: {} Score:{}\n'.format(episode, score))
# Gather scores for episodes scoring above requirement
if score >= score_requirement:
good_episodes += 1
accepted_scores.append(score)
for data in history:
if data[1] == 1:
output = [1]
else:
output = [0]
training_data.append([data[0], output])
scores.append(score)
# Print number of episodes above score requirement
if len(accepted_scores) > 0:
print("Average accepted score: ", np.mean(accepted_scores))
print("Median accepted score : ", np.median(accepted_scores))
print("Episodes above accepted score of {}: {}/{}\n".format(score_requirement, good_episodes, episodes))
# Build Model and print summary
model = build_model()
model.summary()
# # Save Model
# model.save('./testModel1', overwrite=True)
# print("Model saved.")
dqn = build_agent(model)
dqn = build_dqn(dqn)
scores = dqn.test(env, nb_episodes=1, visualize=False)
print(np.mean(scores.history['episode_reward']))
pred = dqn.model.predict(np.expand_dims(np.array([30, 30]), axis=0))
确保您输入的模型具有正确的形状。它需要一个 3D 张量,其中第一个维度是批量大小。试试这个:
test_array = np.random.random((1, 1, 2))
print('Test array --> ', test_array)
print('Predictions --> ', dqn.model.predict(test_array))
Test array --> [[[0.4636345 0.18498545]]]
Predictions --> [[-0.01383634 0.006188 0.03987967 0.03497294 0.02642388]]
这个项目背后的想法是最终删除“测试”变量并利用来自传感器的真实数据。测试环境可以,但现在我希望能够使用真实数据。
两个数据点,从 1 到 100 的整数,用作输入:土壤湿度和下雨的可能性。
底线: 我只想输入两个数字并获得模型对要采取的行动的最佳预测(如果可能,还有置信度百分比)。
虽然我在尝试进行预测时遇到错误。
我尝试过的事情:
pred = dqn.model.predict(np.array([30, 30]))
ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (2, 1)
pred = dqn.model.predict(np.expand_dims(np.array([30, 30]), axis=0))
ValueError: Error when checking input: expected flatten_input to have 3 dimensions, but got array with shape (1, 2)
我看到其他一些帖子提到重塑,但我对这个项目有点精疲力竭,不确定这是否是解决方案。
为了更快地进行测试,我降低了一些变量,但是在这里。这是我当前的代码:
import os
import random
from abc import ABC
import numpy as np
from gym import Env
from gym.spaces import Discrete, Box
from rl.agents import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import BoltzmannQPolicy
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Step per episode
steps = 10000
# Number of episodes
episodes = 100
# Score requirement per episode
# Used for stats and to filter training data
score_requirement = 1000
# Creates a Model that emulates a Markov Decision Process
# Finite process -> steps
# Rewards for watering well and punishes for watering bad
# Action -> Observation -> Reward
class PlantEnv(Env, ABC):
def __init__(self):
# Actions = water: 0=(none), 1=(3 seconds), 2=(4 seconds), 3=(5 seconds), 4=(6 seconds)
self.action_space = Discrete(5)
# Starting Moisture
moisture = 20 + random.randint(-10, 10)
# Starting Chance of Rain
chance_of_rain = 50 + random.randint(-50, 50)
# Observations
self.observation_space = Box(low=np.array([0, 0]), high=np.array([100, 100]), dtype=np.int)
self.state = moisture, chance_of_rain
# Number of water steps left
self.water_length = steps
def step(self, action):
# Action section
water = 0
if action == 1:
water = 2
elif action == 2:
water = 3
elif action == 3:
water = 4
elif action == 4:
water = 5
# Retrieve previous state
moisture, chance_of_rain = self.state
# The lower/higher this is, greatly affects the scoring
# 5 or 6 is the best with this setup
moisture += (water * 5)
self.water_length -= 1
# Reward Section
reward = 0
if 40 <= moisture <= 60:
reward = 2
# If moisture is dry or wet
elif 60 < moisture <= 80 or 20 <= moisture < 40:
reward = 1
# If moisture is really dry or really wet
elif 80 < moisture <= 100 or 0 <= moisture < 20:
reward = -1
# If moisture is really dry or really wet
elif 100 < moisture or moisture < 0:
reward = -2
# Check if shower is done
if self.water_length <= 0:
done = True
else:
done = False
# Apply noise to test program
# Simulate real-life conditions: evaporation, water loss, rain
# Not used in final program
moistureLoss = random.randint(15, 25)
moisture -= moistureLoss
# Simulate chance of rain
chance_of_rain = 50 + random.randint(-50, 50)
xfactor = chance_of_rain + random.randint(-50, 50)
if xfactor > 100:
moisture += (10 + random.randint(0, 15))
# Set placeholder for info
info = {}
# Save current state
self.state = moisture, chance_of_rain
# Return step information
return self.state, reward, done, info
def reset(self):
# Reset test environment
# Set starting moisture
moisture = 50 + random.randint(-10, 10)
# Set starting chance of rain array
chance_of_rain = 50 + random.randint(-50, 50)
self.state = moisture, chance_of_rain
# Reset Test time
self.water_length = steps
return self.state
# # Builds a model using previously defined states and actions
# def build_model():
# inputs = Input(shape=(1, 2), name="input")
# inputsF = Flatten()(inputs)
# common = Dense(24, activation="relu", name="state")(inputsF)
# action = Dense(5, activation="softmax", name="action")(common)
# critic = Dense(1, name="output")(common)
# model = keras.Model(inputs=inputs, outputs=[action, critic])
# return model
# Build Model
def build_model():
model = Sequential()
model.add(Flatten(input_shape=(1, 2)))
model.add(Dense(24, activation='relu'))
model.add(Dense(48, activation='relu'))
model.add(Dense(5, activation='linear'))
return model
# Build Agent
def build_agent(model):
policy = BoltzmannQPolicy()
memory = SequentialMemory(limit=1000, window_length=1)
dqn = DQNAgent(model=model, memory=memory, policy=policy, nb_actions=5,
nb_steps_warmup=50, target_model_update=1e-3)
return dqn
# Build Deep-Q-Network
def build_dqn(dqn):
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae', 'accuracy'])
dqn.fit(env, nb_steps=2000, visualize=False, verbose=1)
return dqn
# Create environment
env = PlantEnv()
# Store data to show scoring stats and to use for training.
accepted_scores = []
training_data = []
scores = []
good_episodes = 0
# Create episodes and initiate simulation
for episode in range(1, episodes + 1):
observation = env.reset()
done = False
score = 0
history = []
prev_observation = []
# Print starting moisture to compare to ending moisture
# print("Start Moisture: {}%".format(observation[0]))
while not done:
action = env.action_space.sample()
# Force action override for plant protection
if observation[0] > 100:
action = 0
elif observation[0] < 0:
action = 4
observation, reward, done, info = env.step(action)
score += reward
if len(prev_observation) > 0:
history.append([prev_observation, action])
prev_observation = observation
# # Print ending moisture to compare to starting moisture
# # Then print Episode number and score
# print("End Moisture : {}%".format(observation[0]))
# print('Episode: {} Score:{}\n'.format(episode, score))
# Gather scores for episodes scoring above requirement
if score >= score_requirement:
good_episodes += 1
accepted_scores.append(score)
for data in history:
if data[1] == 1:
output = [1]
else:
output = [0]
training_data.append([data[0], output])
scores.append(score)
# Print number of episodes above score requirement
if len(accepted_scores) > 0:
print("Average accepted score: ", np.mean(accepted_scores))
print("Median accepted score : ", np.median(accepted_scores))
print("Episodes above accepted score of {}: {}/{}\n".format(score_requirement, good_episodes, episodes))
# Build Model and print summary
model = build_model()
model.summary()
# # Save Model
# model.save('./testModel1', overwrite=True)
# print("Model saved.")
dqn = build_agent(model)
dqn = build_dqn(dqn)
scores = dqn.test(env, nb_episodes=1, visualize=False)
print(np.mean(scores.history['episode_reward']))
pred = dqn.model.predict(np.expand_dims(np.array([30, 30]), axis=0))
确保您输入的模型具有正确的形状。它需要一个 3D 张量,其中第一个维度是批量大小。试试这个:
test_array = np.random.random((1, 1, 2))
print('Test array --> ', test_array)
print('Predictions --> ', dqn.model.predict(test_array))
Test array --> [[[0.4636345 0.18498545]]]
Predictions --> [[-0.01383634 0.006188 0.03987967 0.03497294 0.02642388]]