td-q-learning-ai.py [download]
#!/usr/bin/env python3
#
#
# Using the FrozenLake environment from Farama's Gymnasium
#
# https://gymnasium.farama.org/environments/toy_text/frozen_lake/
#
import numpy as np
import gymnasium as gym
import sys
import argparse
import logging
import os.path
import joblib
class QTable:
def __init__(self, n_states, n_actions):
"""
n_states: integer; number of states
n_actions: integer; number of actions
"""
self.Q = self.create_Q_table(n_states, n_actions)
return
def n_states(self):
"""
"""
return self.Q.shape[0]
def n_actions(self):
"""
"""
return self.Q.shape[1]
def actions(self):
"""
"""
return [action for action in range(self.n_actions())]
def create_Q_table(self, n_states, n_actions):
"""
n_states: integer; number of states
n_actions: integer; number of actions
"""
Q = np.zeros([n_states, n_actions])
return Q
def update(self, state, action, next_state, reward, alpha, gamma):
"""
state: integer; state index
action: numpy.int64; action index
next_state: integer; state index
reward: float; immediate reward
alpha: float; immediate vs historical weight
gamma: float; future discount factor
"""
self.Q[state, action] = (1.0-alpha)*self.Q[state, action] + alpha*(reward + gamma*np.max(self.Q[next_state, :]))
return
def get_best_action(self, state):
"""
state: integer; state index
"""
return np.argmax(self.Q[state, :])
def get_Q_value(self, state, action):
"""
state: integer; state index
action: numpy.int64; action index
"""
return self.Q[state, action]
def get_best_action_value(self, state):
"""
state: integer; state index
"""
best_action = self.get_best_action(state)
best_Q_value = self.Q[state, best_action]
return best_action, best_Q_value
def save(self, model_file):
joblib.dump(self.Q, model_file)
return
def load(self, model_file):
self.Q = joblib.load(model_file)
return
def get_model_filename(model_file, environment_name):
if model_file == "":
model_file = "{}-model.joblib".format(environment_name)
return model_file
# The openai gym environment is loaded
def load_environment(my_args):
if my_args.environment == 'lake':
env = gym.make('FrozenLake-v1')
else:
raise Exception("Unexpected environment: {}".format(my_args.environment))
# env.observation.n, env.action_space.n gives number of states and action in env loaded
return env
def learn_epoch(Q, env, chance_epsilon, alpha, gamma, my_args):
action_list = Q.actions()
# Reset environment, getting initial state
state, info = env.reset()
epoch_total_reward = 0
epoch_done = False
epoch_truncated = False
# The Q-Table temporal difference learning algorithm
while (not epoch_done) and (not epoch_truncated):
# Choose action from Q table
# To facilitate learning, have chance of random action
# instead of always choosing the best action
chance = np.random.sample(1)[0]
if chance < chance_epsilon:
action = np.random.choice(action_list)
else:
action = Q.get_best_action(state)
# Take action, get the new state and reward
next_state, reward, epoch_done, epoch_truncated, info = env.step(action)
if my_args.track_steps:
print(env.render(mode="ansi"))
# Update Q-Table with new data
Q.update(state, action, next_state, reward, alpha, gamma)
epoch_total_reward += reward
state = next_state
return state, epoch_total_reward
def evaluate_epoch(Q, env, my_args):
action_list = Q.actions()
# Reset environment, getting initial state
state, info = env.reset()
epoch_total_reward = 0
epoch_done = False
epoch_truncated = False
# The Q-Table policy evaluation
while (not epoch_done) and (not epoch_truncated):
# Choose action from Q table
action = Q.get_best_action(state)
# Take action, get the new state and reward
next_state, reward, epoch_done, epoch_truncated, info = env.step(action)
if my_args.track_steps:
print(env.render(mode="ansi"))
# Update reward and state
epoch_total_reward += reward
state = next_state
return state, epoch_total_reward
def Q_learn(Q, env, my_args):
almost_one = my_args.epsilon_chance_factor
alpha = my_args.alpha
gamma = my_args.gamma
epoch_rewards = [] # rewards per epochs
chance_epsilon = almost_one
for epoch_number in range(my_args.n_epochs):
state, epoch_total_reward = learn_epoch(Q, env, chance_epsilon, alpha, gamma, my_args)
epoch_rewards.append(epoch_total_reward)
if my_args.track_epochs:
print("epoch: {} reward: {}".format(epoch_number, epoch_total_reward))
# make less likely to experiment
# assumes positive scores for successful completion
if epoch_total_reward > 0:
chance_epsilon *= almost_one
return epoch_rewards
def Q_evaluate(Q, env, my_args):
epoch_rewards = [] # rewards per epochs
for epoch_number in range(my_args.n_epochs):
state, epoch_total_reward = evaluate_epoch(Q, env, my_args)
epoch_rewards.append(epoch_total_reward)
if my_args.track_epochs:
print("epoch: {} reward: {}".format(epoch_number, epoch_total_reward))
return epoch_rewards
def do_learn(my_args):
# Load Environment
env = load_environment(my_args)
# Build new Q-table structure
# assumes that the environment has discrete observation and action spaces
Q = QTable(env.observation_space.n, env.action_space.n)
# Learn
epoch_rewards = Q_learn(Q, env, my_args)
print("Learn: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
model_file = get_model_filename(my_args.model_file, my_args.environment)
Q.save(model_file)
print("Model saved to {}.".format(model_file))
return
def do_score(my_args):
# Load Environment
env = load_environment(my_args)
# Load existing Q-Table
# assumes that the environment has discrete observation and action spaces
Q = QTable(0, 0)
model_file = get_model_filename(my_args.model_file, my_args.environment)
print("Model loading from {}.".format(model_file))
Q.load(model_file)
# Evaluate model
epoch_rewards = Q_evaluate(Q, env, my_args)
print("Score: Average reward on all epochs " + str(sum(epoch_rewards)/my_args.n_epochs))
return
def parse_args(argv):
parser = argparse.ArgumentParser(prog=argv[0], description='Q-Table Learning')
parser.add_argument('action', default='learn',
choices=[ "learn", "score", ],
nargs='?', help="desired action")
parser.add_argument('--environment', '-e', default="lake", type=str, choices=('lake', ), help="name of the OpenAI gym environment")
parser.add_argument('--model-file', '-m', default="", type=str, help="name of file for the model (default is constructed from environment)")
#
# hyper parameters
#
parser.add_argument('--alpha', '-a', default=0.5, type=float, help="Temporal difference learning hyper parameter (default=0.5)")
parser.add_argument('--gamma', '-g', default=0.5, type=float, help="Q-learning hyper parameter (default=0.5)")
parser.add_argument('--epsilon-chance-factor', '-c', default=0.1, type=float, help="Scaling factor for learning policy chance of choosing random action (default=0.1)")
parser.add_argument('--n-epochs', '-n', default=10, type=int, help="number of episodes to run (default=10).")
# debugging/observations
parser.add_argument('--track-epochs', '-t', default=0, type=int, help="0 = don't display per-epoch information, 1 = do display per-epoch information (default=0)")
parser.add_argument('--track-steps', '-s', default=0, type=int, help="0 = don't display per-step information, 1 = do display per-step information (default=0)")
my_args = parser.parse_args(argv[1:])
#
# Do any special fixes/checks here
#
return my_args
def main(argv):
my_args = parse_args(argv)
# logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
if my_args.action == 'learn':
do_learn(my_args)
elif my_args.action == 'score':
do_score(my_args)
else:
raise Exception("Action: {} is not known.".format(my_args.action))
return
if __name__ == "__main__":
main(sys.argv)
Last Updated 03/19/2024