diff --git a/README.md b/README.md index a33de23..700ffb6 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ Have fun! ##### Block 5.1: - [ ] Read the whole chapter: ["ML Algorithms: Reinforcement Learning"](https://franziskahorn.de/mlbook/_ml_algorithms_reinforcement_learning.html) +- [ ] Work through [Notebook 8: RL gridmove](/exercises/8_rl_gridmove.ipynb) ##### Block 5.2: - [ ] Answer [Quiz 5](https://forms.gle/fr7PYmP9Exx4Vvrc8) diff --git a/course_description.pdf b/course_description.pdf index d4c522f..3a5eaf2 100644 Binary files a/course_description.pdf and b/course_description.pdf differ diff --git a/exercises/8_rl_gridmove.ipynb b/exercises/8_rl_gridmove.ipynb new file mode 100644 index 0000000..97595ec --- /dev/null +++ b/exercises/8_rl_gridmove.ipynb @@ -0,0 +1,275 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Reinforcement Learning with discrete states and actions\n", + "\n", + "In this notebook we demonstrate how a RL agent can learn to navigate the grid world environment shown in the book using Q-learning." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# tabular Q-learning is so simple that we don't need an additional library\n", + "import random\n", + "import numpy as np\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Grid Environment\n", + "\n", + "The following class contains a simulation model of the small grid world environment you've seen in the book." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class Environment(object):\n", + " \n", + " def __init__(self):\n", + " # episode ends if the agent dies or finds the money\n", + " self.terminal_states = {(1, 2), (3, 1), (3, 5)}\n", + " # immediate reward for each state (incl. unreachable states: 0)\n", + " self.rewards = [[-1, 0, 0, -1, -1, -1],\n", + " [-1, 0, -100000, -1, 0, -1],\n", + " [-1, -1, -1, -1, 0, -1],\n", + " [-1, -100000, 0, -1, -1, 100]]\n", + " # filter all states that can actually be reached\n", + " self.possible_states = [(i, j) for i in range(len(self.rewards)) \n", + " for j in range(len(self.rewards[i])) if self.rewards[i][j]]\n", + " # state transitions via actions (walking into a wall = staying in the same place)\n", + " self.possible_actions = [\"right\", \"left\", \"up\", \"down\"]\n", + " self.transitions = {\n", + " \"right\": [[(0, 0), (0, 1), (0, 2), (0, 4), (0, 5), (0, 5)],\n", + " [(1, 0), (1, 1), (1, 3), (1, 3), (1, 4), (1, 5)],\n", + " [(2, 1), (2, 2), (2, 3), (2, 3), (2, 4), (2, 5)],\n", + " [(3, 1), (3, 1), (3, 2), (3, 4), (3, 5), (3, 5)]],\n", + " \"left\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 3), (0, 4)],\n", + " [(1, 0), (1, 1), (1, 2), (1, 2), (1, 4), (1, 5)],\n", + " [(2, 0), (2, 0), (2, 1), (2, 2), (2, 4), (2, 5)],\n", + " [(3, 0), (3, 0), (3, 2), (3, 3), (3, 3), (3, 4)]],\n", + " \"up\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],\n", + " [(0, 0), (1, 1), (1, 2), (0, 3), (1, 4), (0, 5)],\n", + " [(1, 0), (2, 1), (1, 2), (1, 3), (2, 4), (1, 5)],\n", + " [(2, 0), (2, 1), (3, 2), (2, 3), (3, 4), (2, 5)]],\n", + " \"down\": [[(1, 0), (0, 1), (0, 2), (1, 3), (0, 4), (1, 5)],\n", + " [(2, 0), (1, 1), (2, 2), (2, 3), (1, 4), (2, 5)],\n", + " [(3, 0), (3, 1), (2, 2), (3, 3), (2, 4), (3, 5)],\n", + " [(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5)]],\n", + " }\n", + " # check which actions per state actually make sense, \n", + " # i.e., we don't want to let our agent run into walls (this just wastes time)\n", + " self.possible_actions_in_state = []\n", + " for (i, j) in self.possible_states:\n", + " acts = []\n", + " for a in self.possible_actions:\n", + " if self.transitions[a][i][j] != (i, j):\n", + " acts.append(a)\n", + " self.possible_actions_in_state.append(acts)\n", + " # get ready for the first episode\n", + " self.episode = 0\n", + " self.reset()\n", + " \n", + " def reset(self):\n", + " # at the beginning of each episode, the agent always starts in the upper left corner\n", + " self.current_state = (0, 0)\n", + " self.episode += 1\n", + " \n", + " def step(self, action):\n", + " \"\"\"\n", + " This is the main function that run in each time step.\n", + " \n", + " Inputs:\n", + " - action [str]: action the agent took; must be one of self.possible_actions\n", + " Returns:\n", + " - reward [int]: immediate reward received for reaching the next state\n", + " - next state [tuple(int, int)]: coordinates of the next state\n", + " - done [bool]: whether the episode terminated and the environment was reset\n", + " \"\"\"\n", + " # see where this action leads us\n", + " self.current_state = self.transitions[action][self.current_state[0]][self.current_state[1]]\n", + " # get the reward for the new state\n", + " reward = self.rewards[self.current_state[0]][self.current_state[1]]\n", + " # check if the episode has ended\n", + " if self.current_state in self.terminal_states:\n", + " self.reset()\n", + " done = True\n", + " else:\n", + " done = False\n", + " return reward, self.current_state, done" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## (Tabular) Q-Learning with epsilon-greedy policy\n", + "\n", + "Learn the Q-table for this environment. The updates to `Q(s, a)` are made according to a more efficient iterative approach called Q-learning (somewhat similar to gradient decent, only that the target value changes in each iteration)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def epsilon_greedy_policy(Q, state, epsilon, env):\n", + " \"\"\"\n", + " Choose an action based on the epsilon greedy strategy\n", + " \n", + " Inputs:\n", + " - Q: current Q-table \n", + " - state: current state\n", + " - epsilon: current epsilon value (probability of choosing a random action)\n", + " - env: simulation model that knows which actions are possible\n", + " Returns:\n", + " - action index (to be used to access env.possible_actions to pick an action)\n", + " \"\"\"\n", + " # exploitation: best action\n", + " if random.uniform(0, 1) > epsilon:\n", + " return np.argmax(Q[state])\n", + " # exploration: random action\n", + " else:\n", + " return env.possible_actions.index(random.choice(env.possible_actions_in_state[state]))\n", + " \n", + "def learn_Q(max_steps=25000, # number of sampling steps\n", + " learning_rate=0.01, # learning rate for Q update\n", + " gamma=0.99, # discounting rate for Q next state\n", + " max_epsilon=1., # exploration probability at start\n", + " min_epsilon=0.001, # minimum exploration probability \n", + " decay_rate=0.01, # exponential decay rate for exploration prob\n", + " seed=15):\n", + " # set seed for reproducable results\n", + " random.seed(seed)\n", + " np.random.seed(seed)\n", + " # initialize environment\n", + " env = Environment()\n", + " # initialize the Q-table of size (possible_states x possible_actions)\n", + " Q = np.zeros((len(env.possible_states), len(env.possible_actions)))\n", + " # reset exploration rate\n", + " epsilon = 1.\n", + " # we want to keep track of the cumulative rewards received in each episode\n", + " cum_rewards = []\n", + " total_reward = 0\n", + " # actually learn Q\n", + " for s in range(1, max_steps+1):\n", + " if not s % 5000:\n", + " print(\"Simulation step: %i\" % s, end=\"\\r\")\n", + " # get the index of the current state (to index Q)\n", + " state = env.possible_states.index(env.current_state)\n", + " # select action based on policy\n", + " action = epsilon_greedy_policy(Q, state, epsilon, env)\n", + " # take the action (a) and observe the reward (r) and resulting state (s')\n", + " reward, new_state, done = env.step(env.possible_actions[action])\n", + " total_reward += reward\n", + " if not done:\n", + " # map new_state to index\n", + " new_state = env.possible_states.index(new_state)\n", + " # update Q(s,a) := Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]\n", + " Q[state, action] = Q[state, action] + learning_rate * (reward + gamma * np.max(Q[new_state]) - Q[state, action]) \n", + " else:\n", + " # we terminated, there is no new state to take into account when updating Q\n", + " Q[state, action] = Q[state, action] + learning_rate * (reward - Q[state, action])\n", + " # reduce epsilon (because we need less and less exploration over time)\n", + " epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*env.episode)\n", + " # save the return we got for this episode\n", + " cum_rewards.append(total_reward)\n", + " total_reward = 0\n", + " # visualize what we have learned\n", + " vis_Q(Q, env)\n", + " # plot the cumulative rewards we got for each episode (--> how fast did we learn?)\n", + " plt.figure(figsize=(15, 5))\n", + " plt.plot(list(range(len(cum_rewards))), cum_rewards)\n", + " plt.xlabel(\"episode\")\n", + " plt.ylabel(\"cumulative reward\")\n", + " plt.ylim(-100, 100)\n", + " return Q, cum_rewards\n", + "\n", + "def vis_Q(Q, env):\n", + " # see which state-action values we have learned\n", + " plt.figure(figsize=(4, 7))\n", + " plt.imshow(Q)\n", + " plt.xticks(list(range(len(env.possible_actions))), env.possible_actions)\n", + " plt.yticks(list(range(len(env.possible_states))), env.possible_states)\n", + " plt.title(\"Q-Table\")\n", + " plt.clim(-100, 100)\n", + " plt.colorbar();\n", + " # plot value of each state\n", + " value = np.zeros((len(env.rewards), len(env.rewards[0])))\n", + " for i in range(len(env.rewards)):\n", + " for j in range(len(env.rewards[i])):\n", + " if (i, j) in env.possible_states:\n", + " value[i, j] = np.max(Q[env.possible_states.index((i, j))])\n", + " plt.figure()\n", + " plt.imshow(value)\n", + " plt.xticks(list(range(value.shape[1])), list(range(1, value.shape[1]+1)))\n", + " plt.yticks(list(range(value.shape[0])), list(range(1, value.shape[0]+1)))\n", + " plt.title(\"value of states\")\n", + " plt.colorbar();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# learn Q with default parameters\n", + "# -> finds the best path quite quickly\n", + "Q = learn_Q()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# much more exploration (and more steps to do it)\n", + "# -> also finds the second path\n", + "Q = learn_Q(max_steps=250000, decay_rate=0.00001)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}