{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Tabular Reinforcement Learning (with discrete states and actions)\n", "\n", "In this notebook we demonstrate how a RL agent can learn to navigate the grid world environment shown in the book using Q-learning." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# tabular Q-learning is so simple that we don't need an additional library\n", "import random\n", "import numpy as np\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Grid Environment\n", "\n", "The following class contains a simulation model of the small grid world environment you've seen in the book." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Environment(object):\n", " \n", " def __init__(self):\n", " # episode ends if the agent dies or finds the money\n", " self.terminal_states = {(1, 2), (3, 1), (3, 5)}\n", " # immediate reward for each state (incl. unreachable states: 0)\n", " self.rewards = [[-1, 0, 0, -1, -1, -1],\n", " [-1, 0, -100000, -1, 0, -1],\n", " [-1, -1, -1, -1, 0, -1],\n", " [-1, -100000, 0, -1, -1, 100]]\n", " # filter all states that can actually be reached\n", " self.possible_states = [(i, j) for i in range(len(self.rewards)) \n", " for j in range(len(self.rewards[i])) if self.rewards[i][j]]\n", " # state transitions via actions (walking into a wall = staying in the same place)\n", " self.possible_actions = [\"right\", \"left\", \"up\", \"down\"]\n", " self.transitions = {\n", " \"right\": [[(0, 0), (0, 1), (0, 2), (0, 4), (0, 5), (0, 5)],\n", " [(1, 0), (1, 1), (1, 3), (1, 3), (1, 4), (1, 5)],\n", " [(2, 1), (2, 2), (2, 3), (2, 3), (2, 4), (2, 5)],\n", " [(3, 1), (3, 1), (3, 2), (3, 4), (3, 5), (3, 5)]],\n", " \"left\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 3), (0, 4)],\n", " [(1, 0), (1, 1), (1, 2), (1, 2), (1, 4), (1, 5)],\n", " [(2, 0), (2, 0), (2, 1), (2, 2), (2, 4), (2, 5)],\n", " [(3, 0), (3, 0), (3, 2), (3, 3), (3, 3), (3, 4)]],\n", " \"up\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],\n", " [(0, 0), (1, 1), (1, 2), (0, 3), (1, 4), (0, 5)],\n", " [(1, 0), (2, 1), (1, 2), (1, 3), (2, 4), (1, 5)],\n", " [(2, 0), (2, 1), (3, 2), (2, 3), (3, 4), (2, 5)]],\n", " \"down\": [[(1, 0), (0, 1), (0, 2), (1, 3), (0, 4), (1, 5)],\n", " [(2, 0), (1, 1), (2, 2), (2, 3), (1, 4), (2, 5)],\n", " [(3, 0), (3, 1), (2, 2), (3, 3), (2, 4), (3, 5)],\n", " [(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5)]],\n", " }\n", " # check which actions per state actually make sense, \n", " # i.e., we don't want to let our agent run into walls (this just wastes time)\n", " self.possible_actions_in_state = []\n", " for (i, j) in self.possible_states:\n", " acts = []\n", " for a in self.possible_actions:\n", " if self.transitions[a][i][j] != (i, j):\n", " acts.append(a)\n", " self.possible_actions_in_state.append(acts)\n", " # get ready for the first episode\n", " self.episode = 0\n", " self.reset()\n", " \n", " def reset(self):\n", " # at the beginning of each episode, the agent always starts in the upper left corner\n", " self.current_state = (0, 0)\n", " self.episode += 1\n", " \n", " def step(self, action):\n", " \"\"\"\n", " This is the main function that run in each time step.\n", " \n", " Inputs:\n", " - action [str]: action the agent took; must be one of self.possible_actions\n", " Returns:\n", " - reward [int]: immediate reward received for reaching the next state\n", " - next state [tuple(int, int)]: coordinates of the next state\n", " - done [bool]: whether the episode terminated and the environment was reset\n", " \"\"\"\n", " # see where this action leads us\n", " self.current_state = self.transitions[action][self.current_state[0]][self.current_state[1]]\n", " # get the reward for the new state\n", " reward = self.rewards[self.current_state[0]][self.current_state[1]]\n", " # check if the episode has ended\n", " if self.current_state in self.terminal_states:\n", " self.reset()\n", " done = True\n", " else:\n", " done = False\n", " return reward, self.current_state, done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## (Tabular) Q-Learning with epsilon-greedy policy\n", "\n", "Learn the Q-table for this environment. The updates to `Q(s, a)` are made according to a more efficient iterative approach called Q-learning (somewhat similar to gradient decent, only that the target value changes in each iteration)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def epsilon_greedy_policy(Q, state, epsilon, env):\n", " \"\"\"\n", " Choose an action based on the epsilon greedy strategy\n", " \n", " Inputs:\n", " - Q: current Q-table \n", " - state: current state\n", " - epsilon: current epsilon value (probability of choosing a random action)\n", " - env: simulation model that knows which actions are possible\n", " Returns:\n", " - action index (to be used to access env.possible_actions to pick an action)\n", " \"\"\"\n", " # exploitation: best action\n", " if random.uniform(0, 1) > epsilon:\n", " return np.argmax(Q[state])\n", " # exploration: random action\n", " else:\n", " return env.possible_actions.index(random.choice(env.possible_actions_in_state[state]))\n", " \n", "def learn_Q(max_steps=25000, # number of sampling steps\n", " learning_rate=0.01, # learning rate for Q update\n", " gamma=0.99, # discounting rate for Q next state\n", " max_epsilon=1., # exploration probability at start\n", " min_epsilon=0.001, # minimum exploration probability \n", " decay_rate=0.01, # exponential decay rate for exploration prob\n", " seed=15):\n", " # set seed for reproducable results\n", " random.seed(seed)\n", " np.random.seed(seed)\n", " # initialize environment\n", " env = Environment()\n", " # initialize the Q-table of size (possible_states x possible_actions)\n", " Q = np.zeros((len(env.possible_states), len(env.possible_actions)))\n", " # reset exploration rate\n", " epsilon = 1.\n", " # we want to keep track of the cumulative rewards received in each episode\n", " cum_rewards = []\n", " total_reward = 0\n", " # actually learn Q\n", " for s in range(1, max_steps+1):\n", " if not s % 5000:\n", " print(\"Simulation step: %i\" % s, end=\"\\r\")\n", " # get the index of the current state (to index Q)\n", " state = env.possible_states.index(env.current_state)\n", " # select action based on policy\n", " action = epsilon_greedy_policy(Q, state, epsilon, env)\n", " # take the action (a) and observe the reward (r) and resulting state (s')\n", " reward, new_state, done = env.step(env.possible_actions[action])\n", " total_reward += reward\n", " if not done:\n", " # map new_state to index\n", " new_state = env.possible_states.index(new_state)\n", " # update Q(s,a) := Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]\n", " Q[state, action] = Q[state, action] + learning_rate * (reward + gamma * np.max(Q[new_state]) - Q[state, action]) \n", " else:\n", " # we terminated, there is no new state to take into account when updating Q\n", " Q[state, action] = Q[state, action] + learning_rate * (reward - Q[state, action])\n", " # reduce epsilon (because we need less and less exploration over time)\n", " epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*env.episode)\n", " # save the return we got for this episode\n", " cum_rewards.append(total_reward)\n", " total_reward = 0\n", " # visualize what we have learned\n", " vis_Q(Q, env)\n", " # plot the cumulative rewards we got for each episode (--> how fast did we learn?)\n", " plt.figure(figsize=(15, 5))\n", " plt.plot(list(range(len(cum_rewards))), cum_rewards)\n", " plt.xlabel(\"episode\")\n", " plt.ylabel(\"cumulative reward\")\n", " plt.ylim(-100, 100)\n", " return Q\n", "\n", "def vis_Q(Q, env):\n", " # see which state-action values we have learned\n", " plt.figure(figsize=(4, 7))\n", " plt.imshow(Q)\n", " plt.xticks(list(range(len(env.possible_actions))), env.possible_actions)\n", " plt.yticks(list(range(len(env.possible_states))), env.possible_states)\n", " plt.title(\"Q-Table\")\n", " plt.clim(-100, 100)\n", " plt.colorbar();\n", " # plot value of each state\n", " value = np.zeros((len(env.rewards), len(env.rewards[0])))\n", " for i in range(len(env.rewards)):\n", " for j in range(len(env.rewards[i])):\n", " if (i, j) in env.possible_states:\n", " value[i, j] = np.max(Q[env.possible_states.index((i, j))])\n", " plt.figure()\n", " plt.imshow(value)\n", " plt.xticks(list(range(value.shape[1])), list(range(1, value.shape[1]+1)))\n", " plt.yticks(list(range(value.shape[0])), list(range(1, value.shape[0]+1)))\n", " plt.title(\"value of states\")\n", " plt.colorbar();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# learn Q with default parameters\n", "# -> finds the best path quite quickly\n", "Q = learn_Q()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# much more exploration (and more steps to do it)\n", "# -> also finds the second path\n", "Q = learn_Q(max_steps=250000, decay_rate=0.00001)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Predict Q-values with a simple `torch` NN\n", "\n", "While training a Q-network goes beyond this course, here is a simple example of how the prediction of Q-values could look like with a neural network. In reality, the state vectors wouldn't be one-hot encoded vectors, but instead some meaningful representation of the states such that the RL agent could also generalize to unseen states." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# torch neural network stuff\n", "import torch\n", "import torch.nn as nn\n", "\n", "class LinNN(nn.Module):\n", " \n", " def __init__(self, Q):\n", " super(LinNN, self).__init__()\n", " self.l = nn.Linear(Q.shape[0], Q.shape[1], bias=False)\n", " # we're not training the network, but directly initialize it with the optimal weights\n", " self.l.weight.data = torch.Tensor(Q.T)\n", " \n", " def forward(self, x):\n", " y = self.l(x)\n", " return y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# initialize the network with the learned Q matrix\n", "qnn = LinNN(Q)\n", "# check that the weights are set appropriately\n", "# -> same picture as above for the Q-matrix, just transposed\n", "plt.imshow(qnn.l.weight.data)\n", "plt.clim(-100, 100);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# generate an input feature vector for some test state\n", "test_state = (2, 0)\n", "# get the index for this state\n", "env = Environment()\n", "test_state_idx = env.possible_states.index(test_state)\n", "# transform into a one-hot encoded torch vector\n", "input_tensor = torch.zeros((1, len(env.possible_states)))\n", "input_tensor[0, test_state_idx] = 1." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check the Q-network \"predictions\"\n", "qnn(input_tensor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# verify that the results are the same as the corresponding row from the Q-matrix\n", "Q[test_state_idx]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.6" } }, "nbformat": 4, "nbformat_minor": 4 }