Files
ml_exercises/notebooks/8_rl_gridmove.ipynb

358 lines
14 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tabular Reinforcement Learning (with discrete states and actions)\n",
"\n",
"In this notebook we demonstrate how a RL agent can learn to navigate the grid world environment shown in the book using Q-learning."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# tabular Q-learning is so simple that we don't need an additional library\n",
"import random\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Grid Environment\n",
"\n",
"The following class contains a simulation model of the small grid world environment you've seen in the book."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class Environment(object):\n",
" \n",
" def __init__(self):\n",
" # episode ends if the agent dies or finds the money\n",
" self.terminal_states = {(1, 2), (3, 1), (3, 5)}\n",
" # immediate reward for each state (incl. unreachable states: 0)\n",
" self.rewards = [[-1, 0, 0, -1, -1, -1],\n",
" [-1, 0, -100000, -1, 0, -1],\n",
" [-1, -1, -1, -1, 0, -1],\n",
" [-1, -100000, 0, -1, -1, 100]]\n",
" # filter all states that can actually be reached\n",
" self.possible_states = [(i, j) for i in range(len(self.rewards)) \n",
" for j in range(len(self.rewards[i])) if self.rewards[i][j]]\n",
" # state transitions via actions (walking into a wall = staying in the same place)\n",
" self.possible_actions = [\"right\", \"left\", \"up\", \"down\"]\n",
" self.transitions = {\n",
" \"right\": [[(0, 0), (0, 1), (0, 2), (0, 4), (0, 5), (0, 5)],\n",
" [(1, 0), (1, 1), (1, 3), (1, 3), (1, 4), (1, 5)],\n",
" [(2, 1), (2, 2), (2, 3), (2, 3), (2, 4), (2, 5)],\n",
" [(3, 1), (3, 1), (3, 2), (3, 4), (3, 5), (3, 5)]],\n",
" \"left\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 3), (0, 4)],\n",
" [(1, 0), (1, 1), (1, 2), (1, 2), (1, 4), (1, 5)],\n",
" [(2, 0), (2, 0), (2, 1), (2, 2), (2, 4), (2, 5)],\n",
" [(3, 0), (3, 0), (3, 2), (3, 3), (3, 3), (3, 4)]],\n",
" \"up\": [[(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],\n",
" [(0, 0), (1, 1), (1, 2), (0, 3), (1, 4), (0, 5)],\n",
" [(1, 0), (2, 1), (1, 2), (1, 3), (2, 4), (1, 5)],\n",
" [(2, 0), (2, 1), (3, 2), (2, 3), (3, 4), (2, 5)]],\n",
" \"down\": [[(1, 0), (0, 1), (0, 2), (1, 3), (0, 4), (1, 5)],\n",
" [(2, 0), (1, 1), (2, 2), (2, 3), (1, 4), (2, 5)],\n",
" [(3, 0), (3, 1), (2, 2), (3, 3), (2, 4), (3, 5)],\n",
" [(3, 0), (3, 1), (3, 2), (3, 3), (3, 4), (3, 5)]],\n",
" }\n",
" # check which actions per state actually make sense, \n",
" # i.e., we don't want to let our agent run into walls (this just wastes time)\n",
" self.possible_actions_in_state = []\n",
" for (i, j) in self.possible_states:\n",
" acts = []\n",
" for a in self.possible_actions:\n",
" if self.transitions[a][i][j] != (i, j):\n",
" acts.append(a)\n",
" self.possible_actions_in_state.append(acts)\n",
" # get ready for the first episode\n",
" self.episode = 0\n",
" self.reset()\n",
" \n",
" def reset(self):\n",
" # at the beginning of each episode, the agent always starts in the upper left corner\n",
" self.current_state = (0, 0)\n",
" self.episode += 1\n",
" \n",
" def step(self, action):\n",
" \"\"\"\n",
" This is the main function that run in each time step.\n",
" \n",
" Inputs:\n",
" - action [str]: action the agent took; must be one of self.possible_actions\n",
" Returns:\n",
" - reward [int]: immediate reward received for reaching the next state\n",
" - next state [tuple(int, int)]: coordinates of the next state\n",
" - done [bool]: whether the episode terminated and the environment was reset\n",
" \"\"\"\n",
" # see where this action leads us\n",
" self.current_state = self.transitions[action][self.current_state[0]][self.current_state[1]]\n",
" # get the reward for the new state\n",
" reward = self.rewards[self.current_state[0]][self.current_state[1]]\n",
" # check if the episode has ended\n",
" if self.current_state in self.terminal_states:\n",
" self.reset()\n",
" done = True\n",
" else:\n",
" done = False\n",
" return reward, self.current_state, done"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## (Tabular) Q-Learning with epsilon-greedy policy\n",
"\n",
"Learn the Q-table for this environment. The updates to `Q(s, a)` are made according to a more efficient iterative approach called Q-learning (somewhat similar to gradient decent, only that the target value changes in each iteration)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def epsilon_greedy_policy(Q, state, epsilon, env):\n",
" \"\"\"\n",
" Choose an action based on the epsilon greedy strategy\n",
" \n",
" Inputs:\n",
" - Q: current Q-table \n",
" - state: current state\n",
" - epsilon: current epsilon value (probability of choosing a random action)\n",
" - env: simulation model that knows which actions are possible\n",
" Returns:\n",
" - action index (to be used to access env.possible_actions to pick an action)\n",
" \"\"\"\n",
" # exploitation: best action\n",
" if random.uniform(0, 1) > epsilon:\n",
" return np.argmax(Q[state])\n",
" # exploration: random action\n",
" else:\n",
" return env.possible_actions.index(random.choice(env.possible_actions_in_state[state]))\n",
" \n",
"def learn_Q(max_steps=25000, # number of sampling steps\n",
" learning_rate=0.01, # learning rate for Q update\n",
" gamma=0.99, # discounting rate for Q next state\n",
" max_epsilon=1., # exploration probability at start\n",
" min_epsilon=0.001, # minimum exploration probability \n",
" decay_rate=0.01, # exponential decay rate for exploration prob\n",
" seed=15):\n",
" # set seed for reproducable results\n",
" random.seed(seed)\n",
" np.random.seed(seed)\n",
" # initialize environment\n",
" env = Environment()\n",
" # initialize the Q-table of size (possible_states x possible_actions)\n",
" Q = np.zeros((len(env.possible_states), len(env.possible_actions)))\n",
" # reset exploration rate\n",
" epsilon = 1.\n",
" # we want to keep track of the cumulative rewards received in each episode\n",
" cum_rewards = []\n",
" total_reward = 0\n",
" # actually learn Q\n",
" for s in range(1, max_steps+1):\n",
" if not s % 5000:\n",
" print(\"Simulation step: %i\" % s, end=\"\\r\")\n",
" # get the index of the current state (to index Q)\n",
" state = env.possible_states.index(env.current_state)\n",
" # select action based on policy\n",
" action = epsilon_greedy_policy(Q, state, epsilon, env)\n",
" # take the action (a) and observe the reward (r) and resulting state (s')\n",
" reward, new_state, done = env.step(env.possible_actions[action])\n",
" total_reward += reward\n",
" if not done:\n",
" # map new_state to index\n",
" new_state = env.possible_states.index(new_state)\n",
" # update Q(s,a) := Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]\n",
" Q[state, action] = Q[state, action] + learning_rate * (reward + gamma * np.max(Q[new_state]) - Q[state, action]) \n",
" else:\n",
" # we terminated, there is no new state to take into account when updating Q\n",
" Q[state, action] = Q[state, action] + learning_rate * (reward - Q[state, action])\n",
" # reduce epsilon (because we need less and less exploration over time)\n",
" epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*env.episode)\n",
" # save the return we got for this episode\n",
" cum_rewards.append(total_reward)\n",
" total_reward = 0\n",
" # visualize what we have learned\n",
" vis_Q(Q, env)\n",
" # plot the cumulative rewards we got for each episode (--> how fast did we learn?)\n",
" plt.figure(figsize=(15, 5))\n",
" plt.plot(list(range(len(cum_rewards))), cum_rewards)\n",
" plt.xlabel(\"episode\")\n",
" plt.ylabel(\"cumulative reward\")\n",
" plt.ylim(-100, 100)\n",
" return Q\n",
"\n",
"def vis_Q(Q, env):\n",
" # see which state-action values we have learned\n",
" plt.figure(figsize=(4, 7))\n",
" plt.imshow(Q)\n",
" plt.xticks(list(range(len(env.possible_actions))), env.possible_actions)\n",
" plt.yticks(list(range(len(env.possible_states))), env.possible_states)\n",
" plt.title(\"Q-Table\")\n",
" plt.clim(-100, 100)\n",
" plt.colorbar();\n",
" # plot value of each state\n",
" value = np.zeros((len(env.rewards), len(env.rewards[0])))\n",
" for i in range(len(env.rewards)):\n",
" for j in range(len(env.rewards[i])):\n",
" if (i, j) in env.possible_states:\n",
" value[i, j] = np.max(Q[env.possible_states.index((i, j))])\n",
" plt.figure()\n",
" plt.imshow(value)\n",
" plt.xticks(list(range(value.shape[1])), list(range(1, value.shape[1]+1)))\n",
" plt.yticks(list(range(value.shape[0])), list(range(1, value.shape[0]+1)))\n",
" plt.title(\"value of states\")\n",
" plt.colorbar();"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# learn Q with default parameters\n",
"# -> finds the best path quite quickly\n",
"Q = learn_Q()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# much more exploration (and more steps to do it)\n",
"# -> also finds the second path\n",
"Q = learn_Q(max_steps=250000, decay_rate=0.00001)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Predict Q-values with a simple `torch` NN\n",
"\n",
"While training a Q-network goes beyond this course, here is a simple example of how the prediction of Q-values could look like with a neural network. In reality, the state vectors wouldn't be one-hot encoded vectors, but instead some meaningful representation of the states such that the RL agent could also generalize to unseen states."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# torch neural network stuff\n",
"import torch\n",
"import torch.nn as nn\n",
"\n",
"class LinNN(nn.Module):\n",
" \n",
" def __init__(self, Q):\n",
" super(LinNN, self).__init__()\n",
" self.l = nn.Linear(Q.shape[0], Q.shape[1], bias=False)\n",
" # we're not training the network, but directly initialize it with the optimal weights\n",
" self.l.weight.data = torch.Tensor(Q.T)\n",
" \n",
" def forward(self, x):\n",
" y = self.l(x)\n",
" return y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# initialize the network with the learned Q matrix\n",
"qnn = LinNN(Q)\n",
"# check that the weights are set appropriately\n",
"# -> same picture as above for the Q-matrix, just transposed\n",
"plt.imshow(qnn.l.weight.data)\n",
"plt.clim(-100, 100);"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# generate an input feature vector for some test state\n",
"test_state = (2, 0)\n",
"# get the index for this state\n",
"env = Environment()\n",
"test_state_idx = env.possible_states.index(test_state)\n",
"# transform into a one-hot encoded torch vector\n",
"input_tensor = torch.zeros((1, len(env.possible_states)))\n",
"input_tensor[0, test_state_idx] = 1."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# check the Q-network \"predictions\"\n",
"qnn(input_tensor)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# verify that the results are the same as the corresponding row from the Q-matrix\n",
"Q[test_state_idx]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}