td-q-learning-ai-step.py [download]
#!/usr/bin/env python3
#
import numpy as np
import gymnasium as gym
import sys
import argparse
import logging
import os.path
import joblib
import tensorflow as tf
import tensorflow.keras as keras
import pandas as pd
import random
class QLeft:
"""
Policy to always move left
"""
def __init__(self):
return
def get_best_action(self, state):
return 0
class QRight:
"""
Policy to always move right
"""
def __init__(self):
return
def get_best_action(self, state):
return 1
class QRandom:
"""
Policy to randomly choose left/right with equal probability
"""
def __init__(self):
return
def get_best_action(self, state):
return random.choice([0,1])
class QFunction:
"""
Learnable policy function, where the state-space is continuous, but the action space is discrete.
"""
def __init__(self, state_shape, n_actions):
"""
state_shape: tuple of integers; number of state variables in each dimension
n_actions: integer; number of actions
"""
self.Q = self.create_Q_function(state_shape, n_actions)
self.state_shape = state_shape
self.num_actions = n_actions
return
def n_actions(self):
"""
"""
return self.num_actions
def actions(self):
"""
"""
return [action for action in range(self.n_actions())]
def create_Q_function(self, state_shape, n_actions):
"""
state_shape: tuple of integers; number of state variables in each dimension
n_actions: integer; number of actions
For CartPole, a state has 4 floating point variables.
Input shape is (4,), meaning a list (or 1-d tensor) with 4 items.
There are 2 actions: left, right
Output shape is [*, 2], meaning a 2-d tensor, with the first
dimension used for the number of predictions, and the second
dimension being the action number (0-1).
"""
model = keras.models.Sequential()
model.add(keras.layers.Input(shape=state_shape))
model.add(keras.layers.Dense(32, activation="elu"))
model.add(keras.layers.Dense(16, activation="elu"))
model.add(keras.layers.Dense(8, activation="elu"))
model.add(keras.layers.Dense(n_actions, activation="linear"))
model.compile(loss="mse", optimizer=keras.optimizers.Adam())
# print("model summary", model.summary())
return model
def update(self, state, action, next_state, reward, gamma, prediction, next_prediction, done):
"""
state: np.array of shape self.state_shape; the floating point values describing the current state
action: np.int64; action index
next_state: np.array of shape self.state_shape; the floating point values describing the next state
reward: float; immediate reward
gamma: float; future discount factor
prediction: np.array of shape (1, n_actions); expected value of actions in state
next_prediction: np.array of shape (1, n_actions); expected value of actions in next_state
done: bool; whether the episode is done or not
For CartPole:
state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
action is an integer (0,1)
next_state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
reward is a python float
gamma is a python float
prediction is a np.array of shape=[1,2]
next_prediction is a np.array of shape=[1,2]
done is a bool
"""
# This is what we want the quality value for action in state to be.
target_quality_value = reward + (1.0-done) * gamma * np.max(next_prediction)
# target_quality_value is a numpy float
# Change for action's value to be target
target_vec = prediction
target_vec[0][action] = target_quality_value
# converts the single state into an array with 1 state
state = state.reshape(-1, state.shape[0])
# # cause the state to be a list of 1 state, because the fit() method needs lists of inputs and target outputs
# state = np.array([state])
# Cause the network to update its weights to attempt to give this target value.
self.Q.fit(state, target_vec, epochs=1, verbose=0)
return
def predict(self, state):
"""
state: np.array of shape self.state_shape; the floating point values describing the current state
returns
prediction: np.array of shape (1, self.n_actions); the floating point predicted value of each action in the current state.
For CartPole:
state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
prediction is a np.array shape=[1,2] ((left_score,right_score)) of floats
"""
# converts the single state into an array with 1 state
state = state.reshape(-1, state.shape[0])
prediction = self.Q.predict(state)
return prediction
def get_best_action(self, state):
"""
state: np.array of shape self.state_shape; the floating point values describing the current state
returns
action: the action that is predicted to give the best score in the current state
For CartPole:
state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
action is an integer (0,1) for (left,right)
"""
# converts the single state into an array with 1 state
state = state.reshape(-1, state.shape[0])
prediction = self.Q.predict(state)
action = np.argmax(prediction[0])
return action
def get_Q_value(self, state, action):
"""
state: np.array of shape self.state_shape; the floating point values describing the current state
action: an integer; the action to give a value for
returns
q-value: a float; the predicted value for taking action in state
For CartPole:
state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
action is an integer (0,1) for (left,right)
q-value is a float
"""
# converts the single state into an array with 1 state
state = state.reshape(-1, state.shape[0])
prediction = self.Q.predict(state)
return prediction[0][action]
def get_best_action_value(self, state):
"""
state: np.array of shape self.state_shape; the floating point values describing the current state
returns
action: the action that is predicted to give the best score in the current state
q-value: a float; the predicted value for taking action in state
For CartPole:
state is a np.array shape=[4] (x, x_dot, theta, theta_dot) of floats
action is an integer (0,1) for (left,right)
q-value is a float
"""
# converts the single state into an array with 1 state
state = state.reshape(-1, state.shape[0])
prediction = self.Q.predict(state)
best_action = np.argmax(prediction[0])
best_Q_value = prediction[0][action]
return best_action, best_Q_value
def show(self):
# for state in self.states():
# state = np.array([state])
# prediction = self.Q.predict(state)
# print(prediction)
state = np.array([[0.0,0.0,0.0,0.0]])
prediction = self.Q.predict(state)
print(prediction)
return
def save(self, model_file):
self.Q.save(model_file)
return
def load(self, model_file):
self.Q = keras.models.load_model(model_file)
return
def get_model_filename(model_file, environment_name):
if model_file == "":
model_file = "{}-model.keras".format(environment_name)
return model_file
def get_rewards_filename(model_file, environment_name):
if model_file == "":
model_file = "{}-rewards.csv".format(environment_name)
return model_file
# The openai gym environment is loaded
def load_environment(my_args):
if my_args.track_steps:
render_mode = "human"
else:
render_mode = None
if my_args.environment == 'cart':
env = gym.make('CartPole-v1', render_mode=render_mode)
else:
raise Exception("Unexpected environment: {}".format(my_args.environment))
# env.observation.n, env.action_space.n gives number of states and action in env loaded
return env
def learn_epoch(Q, env, chance_epsilon, gamma, my_args):
action_list = Q.actions()
# Reset environment, getting initial state
state, info = env.reset()
prediction = Q.predict(state)
epoch_total_reward = 0
epoch_done = False
epoch_truncated = False
# The Q-Table temporal difference learning algorithm
while (not epoch_done) and (not epoch_truncated):
# Choose action from Q function
# To facilitate learning, have chance of random action
# instead of always choosing the best action
chance = np.random.sample(1)[0]
if chance < chance_epsilon:
action = np.random.choice(action_list)
else:
action = Q.get_best_action(state)
# Take action, get the new state and reward
next_state, reward, epoch_done, epoch_truncated, info = env.step(action)
next_prediction = Q.predict(next_state)
# Update Q-Table with new data
Q.update(state, action, next_state, reward, gamma, prediction, next_prediction, epoch_done)
epoch_total_reward += reward
state = next_state
prediction = next_prediction
return state, epoch_total_reward
def evaluate_epoch(Q, env, my_args):
# Reset environment, getting initial state
state, info = env.reset()
epoch_total_reward = 0
epoch_done = False
epoch_truncated = False
# The Q-Table policy evaluation
while (not epoch_done) and (not epoch_truncated):
# Choose action from Q table
action = Q.get_best_action(state)
# Take action, get the new state and reward
next_state, reward, epoch_done, epoch_truncated, info = env.step(action)
# Update reward and state
epoch_total_reward += reward
state = next_state
return state, epoch_total_reward
def Q_learn(Q, env, my_args):
almost_one = my_args.epsilon_chance_factor
gamma = my_args.gamma
epoch_rewards = [] # rewards per epochs
chance_epsilon = almost_one
for epoch_number in range(my_args.n_epochs):
state, epoch_total_reward = learn_epoch(Q, env, chance_epsilon, gamma, my_args)
epoch_rewards.append(epoch_total_reward)
if my_args.track_epochs:
print("epoch: {} reward: {}".format(epoch_number, epoch_total_reward))
sys.stdout.flush()
# make less likely to experiment
# assumes positive scores for successful completion
if epoch_total_reward > 40:
chance_epsilon *= almost_one
chance_epsilon = max(chance_epsilon, 0.01)
return epoch_rewards
def Q_evaluate(Q, env, my_args):
epoch_rewards = [] # rewards per epochs
for epoch_number in range(my_args.n_epochs):
state, epoch_total_reward = evaluate_epoch(Q, env, my_args)
epoch_rewards.append(epoch_total_reward)
if my_args.track_epochs:
print("epoch: {} reward: {}".format(epoch_number, epoch_total_reward))
return epoch_rewards
def do_learn(my_args):
# Load Environment
env = load_environment(my_args)
# Build new Q-function structure
# assumes that the environment has Box observation space and discrete action space
Q = QFunction(env.observation_space.shape, env.action_space.n)
model_file = get_model_filename(my_args.model_file, my_args.environment)
if os.path.exists(model_file):
print("Model loading from {}.".format(model_file))
Q.load(model_file)
# Learn
epoch_rewards = Q_learn(Q, env, my_args)
print("Learn: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
model_file = get_model_filename(my_args.model_file, my_args.environment)
Q.save(model_file)
print("Model saved to {}.".format(model_file))
rewards_file = get_rewards_filename(my_args.rewards_file, my_args.environment)
df = pd.DataFrame(columns = ["epoch","reward"])
for i in range(0, len(epoch_rewards)):
df.loc[i] = [i, epoch_rewards[i]]
df.to_csv(rewards_file, index=False)
return
def do_score(my_args):
# Load Environment
env = load_environment(my_args)
# Load existing Q-Table
# assumes that the environment has discrete observation and action spaces
Q = QFunction([0], 0)
model_file = get_model_filename(my_args.model_file, my_args.environment)
print("Model loading from {}.".format(model_file))
Q.load(model_file)
# Evaluate model
epoch_rewards = Q_evaluate(Q, env, my_args)
print("Score: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
return
def do_left(my_args):
# Load Environment
env = load_environment(my_args)
Q = QLeft()
# Evaluate model
epoch_rewards = Q_evaluate(Q, env, my_args)
print("Score: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
return
def do_right(my_args):
# Load Environment
env = load_environment(my_args)
Q = QRight()
# Evaluate model
epoch_rewards = Q_evaluate(Q, env, my_args)
print("Score: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
return
def do_random(my_args):
# Load Environment
env = load_environment(my_args)
Q = QRandom()
# Evaluate model
epoch_rewards = Q_evaluate(Q, env, my_args)
print("Score: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
return
def parse_args(argv):
parser = argparse.ArgumentParser(prog=argv[0], description='Q-Table Learning')
parser.add_argument('action', default='learn',
choices=[ "learn", "score", "left", "right", "random" ],
nargs='?', help="desired action")
parser.add_argument('--environment', '-e', default="cart", type=str, choices=('cart', ), help="name of the OpenAI gym environment")
parser.add_argument('--model-file', '-m', default="", type=str, help="name of file for the model (default is constructed from environment)")
parser.add_argument('--rewards-file', '-r', default="", type=str, help="name of file for the rewards (default is constructed from environment)")
#
# hyper parameters
#
parser.add_argument('--gamma', '-g', default=0.5, type=float, help="Q-learning hyper parameter (default=0.5)")
parser.add_argument('--epsilon-chance-factor', '-c', default=0.1, type=float, help="Scaling factor for learning policy chance of choosing random action (default=0.1)")
parser.add_argument('--n-epochs', '-n', default=10, type=int, help="number of episodes to run (default=10).")
# debugging/observations
parser.add_argument('--track-epochs', '-t', default=0, type=int, help="0 = don't display per-epoch information, 1 = do display per-epoch information (default=0)")
parser.add_argument('--track-steps', '-s', default=0, type=int, help="0 = don't display per-step information, 1 = do display per-step information (default=0)")
my_args = parser.parse_args(argv[1:])
#
# Do any special fixes/checks here
#
return my_args
def main(argv):
my_args = parse_args(argv)
# logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
if my_args.action == 'learn':
do_learn(my_args)
elif my_args.action == 'score':
do_score(my_args)
elif my_args.action == 'left':
do_left(my_args)
elif my_args.action == 'right':
do_right(my_args)
elif my_args.action == 'random':
do_random(my_args)
else:
raise Exception("Action: {} is not known.".format(my_args.action))
return
if __name__ == "__main__":
main(sys.argv)
Last Updated 03/26/2024