# Tabular Reinforcement Learning (with discrete states and actions)

In this notebook we demonstrate how a RL agent can learn to navigate the grid world environment shown in the book using Q-learning.

In [None]:
# tabular Q-learning is so simple that we don't need an additional library
import random
import numpy as np
import matplotlib.pyplot as plt

## Grid Environment

The following class contains a simulation model of the small grid world environment you've seen in the book.

In [None]:
class Environment(object):
    
    def __init__(self):
        # episode ends if the agent dies or finds the money
        self.terminal_states = {(1, 2), (3, 1), (3, 5)}
        # immediate reward for each state (incl. unreachable states: 0)
        self.rewards = [[-1,       0,       0, -1, -1,  -1],
                        [-1,       0, -100000, -1,  0,  -1],
                        [-1,      -1,      -1, -1,  0,  -1],
                        [-1, -100000,       0, -1, -1, 100]]
        # filter all states that can actually be reached
        self.possible_states = [(i, j) for i in range(len(self.rewards)) 
                                for j in range(len(self.rewards[i])) if self.rewards[i][j]]
        # state transitions via actions (walking into a wall = staying in the same place)
        self.possible_actions = ["right", "left", "up", "down"]
        self.transitions = {
            "right": [[(0, 0), (0, 1), (0, 2), (0, 4), (0, 5), (0, 5)],
                      [(1, 0), (1, 1), (1, 3), (1, 3), (1, 4), (1, 5)],
                      [(2, 1), (2, 2), (2, 3), (2, 3), (2, 4), (2, 5)],
                      [(3, 1), (3, 1), (3, 2), (3, 4), (3, 5), (3, 5)]],
            "left":  [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 3), (0, 4)],
                      [(1, 0), (1, 1), (1, 2), (1, 2), (1, 4), (1, 5)],
                      [(2, 0), (2, 0), (2, 1), (2, 2), (2, 4), (2, 5)],
                      [(3, 0), (3, 0), (3, 2), (3, 3), (3, 3), (3, 4)]],
            "up":    [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],
                      [(0, 0), (1, 1), (1, 2), (0, 3), (1, 4), (0, 5)],
                      [(1, 0), (2, 1), (1, 2), (1, 3), (2, 4), (1, 5)],
                      [(2, 0), (2, 1), (3, 2), (2, 3), (3, 4), (2, 5)]],
            "down":  [[(1, 0), (0, 1), (0, 2), (1, 3), (0, 4), (1, 5)],
                      [(2, 0), (1, 1), (2, 2), (2, 3), (1, 4), (2, 5)],
                      [(3, 0), (3, 1), (2, 2), (3, 3), (2, 4), (3, 5)],
                      [(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5)]],
        }
        # check which actions per state actually make sense, 
        # i.e., we don't want to let our agent run into walls (this just wastes time)
        self.possible_actions_in_state = []
        for (i, j) in self.possible_states:
            acts = []
            for a in self.possible_actions:
                if self.transitions[a][i][j] != (i, j):
                    acts.append(a)
            self.possible_actions_in_state.append(acts)
        # get ready for the first episode
        self.episode = 0
        self.reset()
        
    def reset(self):
        # at the beginning of each episode, the agent always starts in the upper left corner
        self.current_state = (0, 0)
        self.episode += 1
    
    def step(self, action):
        """
        This is the main function that run in each time step.
        
        Inputs:
            - action [str]: action the agent took; must be one of self.possible_actions
        Returns:
            - reward [int]: immediate reward received for reaching the next state
            - next state [tuple(int, int)]: coordinates of the next state
            - done [bool]: whether the episode terminated and the environment was reset
        """
        # see where this action leads us
        self.current_state = self.transitions[action][self.current_state[0]][self.current_state[1]]
        # get the reward for the new state
        reward = self.rewards[self.current_state[0]][self.current_state[1]]
        # check if the episode has ended
        if self.current_state in self.terminal_states:
            self.reset()
            done = True
        else:
            done = False
        return reward, self.current_state, done

## (Tabular) Q-Learning with epsilon-greedy policy

Learn the Q-table for this environment. The updates to `Q(s, a)` are made according to a more efficient iterative approach called Q-learning (somewhat similar to gradient decent, only that the target value changes in each iteration).

In [None]:
def epsilon_greedy_policy(Q, state, epsilon, env):
    """
    Choose an action based on the epsilon greedy strategy
    
    Inputs:
        - Q: current Q-table 
        - state: current state
        - epsilon: current epsilon value (probability of choosing a random action)
        - env: simulation model that knows which actions are possible
    Returns:
        - action index (to be used to access env.possible_actions to pick an action)
    """
    # exploitation: best action
    if random.uniform(0, 1) > epsilon:
        return np.argmax(Q[state])
    # exploration: random action
    else:
        return env.possible_actions.index(random.choice(env.possible_actions_in_state[state]))
    
def learn_Q(max_steps=25000,    # number of sampling steps
            learning_rate=0.01, # learning rate for Q update
            gamma=0.99,         # discounting rate for Q next state
            max_epsilon=1.,     # exploration probability at start
            min_epsilon=0.001,  # minimum exploration probability 
            decay_rate=0.01,    # exponential decay rate for exploration prob
            seed=15):
    # set seed for reproducable results
    random.seed(seed)
    np.random.seed(seed)
    # initialize environment
    env = Environment()
    # initialize the Q-table of size (possible_states x possible_actions)
    Q = np.zeros((len(env.possible_states), len(env.possible_actions)))
    # reset exploration rate
    epsilon = 1.
    # we want to keep track of the cumulative rewards received in each episode
    cum_rewards = []
    total_reward = 0
    # actually learn Q
    for s in range(1, max_steps+1):
        if not s % 5000:
            print("Simulation step: %i" % s, end="\r")
        # get the index of the current state (to index Q)
        state = env.possible_states.index(env.current_state)
        # select action based on policy
        action = epsilon_greedy_policy(Q, state, epsilon, env)
        # take the action (a) and observe the reward (r) and resulting state (s')
        reward, new_state, done = env.step(env.possible_actions[action])
        total_reward += reward
        if not done:
            # map new_state to index
            new_state = env.possible_states.index(new_state)
            # update Q(s,a) := Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            Q[state, action] = Q[state, action] + learning_rate * (reward + gamma * np.max(Q[new_state]) - Q[state, action]) 
        else:
            # we terminated, there is no new state to take into account when updating Q
            Q[state, action] = Q[state, action] + learning_rate * (reward - Q[state, action])
            # reduce epsilon (because we need less and less exploration over time)
            epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*env.episode)
            # save the return we got for this episode
            cum_rewards.append(total_reward)
            total_reward = 0
    # visualize what we have learned
    vis_Q(Q, env)
    # plot the cumulative rewards we got for each episode (--> how fast did we learn?)
    plt.figure(figsize=(15, 5))
    plt.plot(list(range(len(cum_rewards))), cum_rewards)
    plt.xlabel("episode")
    plt.ylabel("cumulative reward")
    plt.ylim(-100, 100)
    return Q

def vis_Q(Q, env):
    # see which state-action values we have learned
    plt.figure(figsize=(4, 7))
    plt.imshow(Q)
    plt.xticks(list(range(len(env.possible_actions))), env.possible_actions)
    plt.yticks(list(range(len(env.possible_states))), env.possible_states)
    plt.title("Q-Table")
    plt.clim(-100, 100)
    plt.colorbar();
    # plot value of each state
    value = np.zeros((len(env.rewards), len(env.rewards[0])))
    for i in range(len(env.rewards)):
        for j in range(len(env.rewards[i])):
            if (i, j) in env.possible_states:
                value[i, j] = np.max(Q[env.possible_states.index((i, j))])
    plt.figure()
    plt.imshow(value)
    plt.xticks(list(range(value.shape[1])), list(range(1, value.shape[1]+1)))
    plt.yticks(list(range(value.shape[0])), list(range(1, value.shape[0]+1)))
    plt.title("value of states")
    plt.colorbar();

In [None]:
# learn Q with default parameters
# -> finds the best path quite quickly
Q = learn_Q()

In [None]:
# much more exploration (and more steps to do it)
# -> also finds the second path
Q = learn_Q(max_steps=250000, decay_rate=0.00001)

## Predict Q-values with a simple `torch` NN

While training a Q-network goes beyond this course, here is a simple example of how the prediction of Q-values could look like with a neural network. In reality, the state vectors wouldn't be one-hot encoded vectors, but instead some meaningful representation of the states such that the RL agent could also generalize to unseen states.

In [None]:
# torch neural network stuff
import torch
import torch.nn as nn

class LinNN(nn.Module):
    
    def __init__(self, Q):
        super(LinNN, self).__init__()
        self.l = nn.Linear(Q.shape[0], Q.shape[1], bias=False)
        # we're not training the network, but directly initialize it with the optimal weights
        self.l.weight.data = torch.Tensor(Q.T)
        
    def forward(self, x):
        y = self.l(x)
        return y

In [None]:
# initialize the network with the learned Q matrix
qnn = LinNN(Q)
# check that the weights are set appropriately
# -> same picture as above for the Q-matrix, just transposed
plt.imshow(qnn.l.weight.data)
plt.clim(-100, 100);

In [None]:
# generate an input feature vector for some test state
test_state = (2, 0)
# get the index for this state
env = Environment()
test_state_idx = env.possible_states.index(test_state)
# transform into a one-hot encoded torch vector
input_tensor = torch.zeros((1, len(env.possible_states)))
input_tensor[0, test_state_idx] = 1.

In [None]:
# check the Q-network "predictions"
qnn(input_tensor)

In [None]:
# verify that the results are the same as the corresponding row from the Q-matrix
Q[test_state_idx]