From c6e0ee5c05bcb09680daf98dc669052f1670412e Mon Sep 17 00:00:00 2001 From: Arthur DANJOU Date: Fri, 30 Jan 2026 14:49:47 +0100 Subject: [PATCH] Implement feature X to enhance user experience and fix bug Y in module Z --- ...and Figure 8_2 in [Sutton and Barto].ipynb | 522 ++++++++++++++++++ 1 file changed, 522 insertions(+) create mode 100644 M2/Reinforcement Learning/Lab 6A Dyna-Q and Figure 8_2 in [Sutton and Barto].ipynb diff --git a/M2/Reinforcement Learning/Lab 6A Dyna-Q and Figure 8_2 in [Sutton and Barto].ipynb b/M2/Reinforcement Learning/Lab 6A Dyna-Q and Figure 8_2 in [Sutton and Barto].ipynb new file mode 100644 index 0000000..53cea94 --- /dev/null +++ b/M2/Reinforcement Learning/Lab 6A Dyna-Q and Figure 8_2 in [Sutton and Barto].ipynb @@ -0,0 +1,522 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "27bda6b2", + "metadata": {}, + "source": [ + "\n", + "# Lab6A: Dyna-Q and Figure 8.2 in [Sutton & Barto]\n", + "\n", + "This lab reproduces **Figure 8.2** from *Reinforcement Learning: An Introduction*\n", + "(Sutton & Barto, Chapter 8).\n", + "\n", + "We study **Dyna-Q** in a simple deterministic maze and analyze how the number of\n", + "planning steps `n` affects learning speed, measured in **steps per episode**.\n", + "\n", + "You will:\n", + "- Build the **Dyna Maze** (47 states)\n", + "- Implement a **Dyna-Q algorithm**\n", + "- Reproduce the learning curves for `n = 0, 5, 50`\n", + "- Understand *why* planning accelerates learning\n", + "\n", + "---\n" + ] + }, + { + "cell_type": "markdown", + "id": "009b4a82", + "metadata": {}, + "source": [ + "\n", + "## 1. Experimental setup\n", + "\n", + "- Discount factor: $\\gamma = 0.95$\n", + "- Learning rate: $\\alpha = 0.1$\n", + "- Exploration: $\\varepsilon = 0.1$ for $\\varepsilon$-greedy.\n", + "- Reward:\n", + " - `+1` when entering the goal state\n", + " - `0` otherwise\n", + "- Environment:\n", + " - Deterministic transitions\n", + " - Grid world with obstacles (47 states total)\n", + "\n", + "These choices match the RL book setup for Figure 8.2 in [Sutton & Barto].\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ef6587ce", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "from typing import ClassVar\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n" + ] + }, + { + "cell_type": "markdown", + "id": "c2b72d28", + "metadata": {}, + "source": [ + "**Exercise 1.** (The Dyna Maze Environment) Implement the maze used in Figure 8.2 and understand the code.\n", + "\n", + "The maze:\n", + "- Is a 6×9 grid (54 cells)\n", + "- Contains **7 obstacles**, leaving **47 valid states**\n", + "- Has four actions: up, right, down, left\n", + "- If an action would hit a wall or obstacle, the agent **stays in place**\n", + "- Reward is +1 **only when entering the goal cell**\n" + ] + }, + { + "cell_type": "markdown", + "id": "1fe98da4", + "metadata": {}, + "source": [ + "\n", + "*Hints.*\n", + "\n", + "- Use a `(row, col) -> state_id` mapping\n", + "- Obstacles can be stored as a `set` of coordinates\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "58da6cb2-caf4-4759-85a7-1aaea0c051ee", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass # is the class header and automatically generates an __init__ for you using the annotated fields (rows, cols, gamma).\n", + "class DynaMazeEnv:\n", + " \"\"\"A simple gridworld maze environment for Dyna-Q experiments, matching Figure 8.2 in Sutton and Barto.\n", + "\n", + " Args:\n", + " rows (int): number of rows in the maze grid\n", + " cols (int): number of columns in the maze grid\n", + " gamma (float): discount factor for future rewards\n", + "\n", + " \"\"\"\n", + "\n", + " rows: int = (\n", + " 6 # Default maze size is 6x9, matching the maze used in Figure 8_2 example\n", + " )\n", + " cols: int = 9\n", + " gamma: float = 0.95 # discount factor\n", + "\n", + " action_moves: ClassVar[dict[int, tuple[int, int]]] = {\n", + " 0: (-1, 0), # up\n", + " 1: (0, 1), # right\n", + " 2: (1, 0), # down\n", + " 3: (0, -1), # left\n", + " }\n", + "\n", + " def __post_init__(self) -> None:\n", + " \"\"\"__post_init__ is called automatically after the dataclass __init__ method. It initializes the maze structure, walls, start/goal positions, and state mappings.\"\"\"\n", + " # 7 obstacles -> 54 - 7 = 47 states since\n", + " # Total grid cells: 6 * 9 = 54\n", + " # Wall cells: 7\n", + " # Valid states: 54 - 7 = 47\n", + "\n", + " self.walls = set() # self.walls stores blocked cells (walls) as (r, c) pairs.\n", + " for r in [\n", + " 1,\n", + " 2,\n", + " 3,\n", + " ]: # This part creates the maze's “two vertical walls + one extra block” structure.\n", + " self.walls.add((r, 2))\n", + " for r in [0, 1, 2]:\n", + " self.walls.add((r, 7))\n", + " self.walls.add((4, 5))\n", + "\n", + " self.start_rc = (2, 0) # Start is at row 2, col 0.\n", + " self.goal_rc = (0, 8) # Goal is at row 0, col 8.\n", + "\n", + " # Now we map (row,col) to state index\n", + " self.rc_to_s = {}\n", + " self.s_to_rc = {}\n", + " s = 0\n", + " for r in range(self.rows): # Iterate through all cells\n", + " for c in range(self.cols):\n", + " if (\n", + " (r, c) in self.walls\n", + " ): # If the cell is a wall, skip it: walls are not states.\n", + " continue\n", + " self.rc_to_s[(r, c)] = (\n", + " s # Otherwise, rc_to_s[(r,c)] = s gives an integer label for this free cell.\n", + " )\n", + " self.s_to_rc[s] = (\n", + " r,\n", + " c,\n", + " ) # s_to_rc[s] = (r,c) lets you recover the grid coordinate from a state id.\n", + " s += 1 # s increments only for valid cells.\n", + "\n", + " assert s == 47, ( # noqa: PLR2004, S101\n", + " f\"Expected 47 states, got {s}\"\n", + " ) # The assert prevents silent bugs if walls are miscounted.\n", + " self.n_states = s # number of states\n", + " self.n_actions = 4 # number of actions\n", + " self.reset() # self.reset() sets the initial state.\n", + "\n", + " def reset(self) -> int:\n", + " \"\"\"Reset the environment to the start state.\"\"\"\n", + " self.state = self.rc_to_s[self.start_rc]\n", + " return self.state\n", + "\n", + " def step(self, action: int) -> tuple[int, float, bool]:\n", + " \"\"\"Take an action in the environment. Returns (next_state, reward, done).\"\"\"\n", + " r, c = self.s_to_rc[\n", + " self.state\n", + " ] # self.state is an integer, so we need to convert it to grid coordinate.\n", + " dr, dc = self.action_moves[action]\n", + " nr, nc = r + dr, c + dc # Compute proposed next cell (nr, nc).\n", + "\n", + " if ( # Boundary + wall collision handling : If the agent try to go outside the grid or into a wall, the agent stays in place.\n", + " nr < 0\n", + " or nr >= self.rows\n", + " or nc < 0\n", + " or nc >= self.cols\n", + " or (nr, nc) in self.walls\n", + " ):\n", + " nr, nc = r, c # The agent should stay in the same place.\n", + "\n", + " next_state = self.rc_to_s[(nr, nc)] # convert back to integer state\n", + " reward = 1.0 if (nr, nc) == self.goal_rc else 0.0 # Reward: Goal = 1, other = 0\n", + " done = (\n", + " nr,\n", + " nc,\n", + " ) == self.goal_rc # termination : Episode ends when goal is reached.\n", + " self.state = next_state # Update self.state\n", + " return next_state, reward, done # return transition tuple\n" + ] + }, + { + "cell_type": "markdown", + "id": "2504416c", + "metadata": {}, + "source": [ + "\n", + "**Exercise 2.** (Implement Dyna-Q) Implement a **tabular Dyna-Q algorithm**:\n", + "\n", + "Algorithm structure:\n", + "1. Select action using $\\varepsilon$-greedy with fair tie-breaking.\n", + "2. Take real step in environment\n", + "3. Q-learning update\n", + "4. Store transition in the model\n", + "5. Perform `n` planning updates by sampling from the model\n" + ] + }, + { + "cell_type": "markdown", + "id": "adc560ec", + "metadata": {}, + "source": [ + "\n", + "### Hints\n", + "\n", + "- Initialize all Q-values to zero\n", + "- Use a dictionary for the model: `(s, a) -> (r, s')`\n", + "- Planning updates are **identical** to Q-learning updates\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "1a5f489d", + "metadata": {}, + "outputs": [], + "source": [ + "def epsilon_greedy_action(\n", + " Q_row: np.ndarray,\n", + " eps: float,\n", + " rng: np.random.Generator,\n", + ") -> int:\n", + " \"\"\"Implement an ε-greedy policy. Here, Q_row is the vector Q(s, .) for a fixed state s.\"\"\"\n", + " if rng.random() < eps: # With probability epsilon: explore (choose a random action)\n", + " return int(\n", + " rng.integers(0, len(Q_row)),\n", + " ) # With probability 1-ε: exploit (choose an action with the highest estimated value)\n", + " maxv = np.max(Q_row) # Finds the maximum Q-value among actions.\n", + " best = np.flatnonzero(\n", + " Q_row == maxv,\n", + " ) # here can be ties, means several actions have the same Q value,\n", + " # Q_row == maxv gives a boolean array like [True, False, True, ...]\n", + " # np.flatnonzero(...) returns the indices where it's True (a 1D array), e.g. [0, 2]\n", + " return int(\n", + " rng.choice(best),\n", + " ) # If multiple best actions exist, pick one uniformly at random and this prevents a systematic bias like always choosing the smallest index action.\n", + "\n", + "\n", + "class DynaQAgent:\n", + " \"\"\"Now we define a learning agent that implements the Dyna-Q algorithm.\"\"\"\n", + "\n", + " def __init__( # noqa: PLR0913\n", + " self,\n", + " n_states: int,\n", + " n_actions: int,\n", + " alpha: float = 0.1,\n", + " gamma: float = 0.95,\n", + " eps: float = 0.1,\n", + " planning_steps: int = 5,\n", + " rng: np.random.Generator | None = None,\n", + " ) -> None:\n", + " \"\"\"Initialize the Dyna-Q agent with parameters.\n", + "\n", + " Args:\n", + " n_states (int): number of states in the environment\n", + " n_actions (int): number of actions in the environment\n", + " alpha (float): learning rate for Q-learning updates\n", + " gamma (float): discount factor for future rewards\n", + " eps (float): epsilon for epsilon-greedy action selection\n", + " planning_steps (int): number of planning updates per real step\n", + " rng (np.random.Generator, optional): random number generator for reproducibility\n", + "\n", + " \"\"\"\n", + " self.Q = np.zeros(\n", + " (n_states, n_actions),\n", + " ) # This is the aabular Q-learning table with rows = states and columns = actions,\n", + " # Initially all values are 0\n", + " self.alpha = alpha # alpha = learning rate (step size) for Q updates.\n", + " self.gamma = gamma # gamma = discount factor used in TD target.\n", + " self.eps = eps # eps = epsilon for epsilon-greedy exploration.\n", + " self.n = planning_steps # This is the “n” in Dyna-Q: number of planning updates per real step.\n", + " self.rng = (\n", + " rng or np.random.default_rng()\n", + " ) # If we provide a random generator, use it (for reproducibility), otherwise, creat a new one.\n", + "\n", + " self.model = {} # This is the learned model is stored as a dictionary, with Key: (s, a) and value: (r, s'). Here we assume that the maze dynamics are deterministic.\n", + " self.observed_sa = [] # A list of all (s,a) pairs that have been seen at least once.\n", + " # Used so planning can sample only from experienced pairs (instead of impossible ones).\n", + "\n", + " def act(self, s: int) -> int:\n", + " \"\"\"Select an action using an ε-greedy policy based on the current Q-values.\"\"\"\n", + " return epsilon_greedy_action(\n", + " self.Q[s],\n", + " self.eps,\n", + " self.rng,\n", + " ) # self.Q[s] extracts the row vector Q(s, :) (the value of actions for state s) then we pass it to epsilon-greedy.\n", + "\n", + " def q_update(self, s: int, a: int, r: float, sp: int) -> None:\n", + " \"\"\"Perform a Q-learning update for a given transition.\"\"\"\n", + " td_target = r + self.gamma * np.max(self.Q[sp]) # sp here means s'\n", + " self.Q[s, a] += self.alpha * (td_target - self.Q[s, a]) # Q-learning update\n", + "\n", + " def observe(\n", + " self,\n", + " s: int,\n", + " a: int,\n", + " r: float,\n", + " sp: int,\n", + " ) -> None:\n", + " \"\"\"Observe a transition and update the agent's knowledge.\"\"\"\n", + " # Direct RL update\n", + " self.q_update(\n", + " s,\n", + " a,\n", + " r,\n", + " sp,\n", + " ) # Direct RL update : this is learning from real experience (interaction with the environment).\n", + "\n", + " # Model update = this is learning from simulated experience\n", + " key = (s, a) # key identifies the state-action pair.\n", + " if key not in self.model: # If this (s,a) has never been seen,\n", + " self.observed_sa.append(\n", + " key,\n", + " ) # record it in observed_sa so planning can use it.\n", + " self.model[key] = (\n", + " r,\n", + " sp,\n", + " ) # Now we update the model : “If in state s take action a, we will get reward r and go to state sp.”\n", + "\n", + " # Planning = This is the “Dyna” part.\n", + " for _ in range(self.n): # for each planning step (n steps in total)\n", + " ss, aa = self.observed_sa[\n", + " int(self.rng.integers(len(self.observed_sa)))\n", + " ] # Sample a previously observed (s,a) pair uniformly at random\n", + " rr, ssp = self.model[\n", + " (ss, aa)\n", + " ] # Use the model to retrieve its predicted reward and next state (rr, ssp). Hint. use self.model[key], what is the key here ?\n", + " self.q_update(\n", + " ss,\n", + " aa,\n", + " rr,\n", + " ssp,\n", + " ) # Apply the same Q-learning update as if it were a real transition. Hint. use self.q_update(ss, aa, rr, ssp)" + ] + }, + { + "cell_type": "markdown", + "id": "2688c480", + "metadata": {}, + "source": [ + "\n", + "**Exercise 3.** Reproduce Figure 8.2\n", + "\n", + "Run the experiment:\n", + "- 50 episodes\n", + "- Average over 30 independent runs\n", + "- Compare `n = 0`, `n = 5`, `n = 50`\n", + "- Plot **steps per episode vs episodes**\n" + ] + }, + { + "cell_type": "markdown", + "id": "a6589733", + "metadata": {}, + "source": [ + "\n", + "*Hints*\n", + "\n", + "- Use the **same random seed across different `n` values** for each repetition\n", + "- Episode 1 should be identical for all `n`\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "206bf2bb", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "def run_episodes(\n", + " env: DynaMazeEnv,\n", + " planning_steps: int,\n", + " episodes: int = 50,\n", + " seed: int = 0,\n", + ") -> np.ndarray:\n", + " \"\"\"Run one experiment of Dyna-Q on a given environment, for a fixed number of episodes and planning steps.\n", + "\n", + " Args:\n", + " env (DynaMazeEnv): the maze environment\n", + " planning_steps (int): the Dyna parameter n = number of model/planning updates per real step.\n", + " episodes (int, optional): number of episodes to run. Defaults to 50.\n", + " seed (int, optional): random seed for reproducibility. Defaults to 0.\n", + "\n", + " \"\"\"\n", + " rng = np.random.default_rng(\n", + " seed,\n", + " ) # create a random number generator for reproducibility\n", + " agent = DynaQAgent(\n", + " env.n_states, env.n_actions, planning_steps=planning_steps, rng=rng,\n", + " ) # Initializes a new Dyna-Q agent with:\n", + " # Q-table size (n_states, n_actions)\n", + " # planning_steps = the Dyna parameter n = number of model/planning updates per real step.\n", + " # rng for reproducibility\n", + "\n", + " steps_per_episode = [] # This will store one integer per episode: the number of steps until reaching goal.\n", + "\n", + " for _ in range(episodes): # Loop episodes\n", + " s = env.reset() # Reset the environment at the start for each episode\n", + " done = False # done is the terminal indicator from the environment, it is false at the beginning\n", + " steps = 0 # steps counts how many environment interactions happen this episode.\n", + " while not done: # interaction until terminal\n", + " a = agent.act(s) # Choose action using epsilon-greedy policy\n", + " sp, r, done = env.step(\n", + " a,\n", + " ) # this is the environment transition: sp = next state s'; r = reward ; done = whether goal reached.\n", + " agent.observe(\n", + " s, a, r, sp,\n", + " ) # This is the whole Dyna update : the agent will learn from this real transition + plan from model\n", + " s = sp # this line advances state variable, now sp becomes the current state\n", + " steps += 1 # add + 1 into the step counter\n", + " steps_per_episode.append(steps) # Store episode length after reaching the goal\n", + "\n", + " return np.array(steps_per_episode) # return the number of step as NumPy array\n", + "\n", + "\n", + "def replicate_figure() -> None:\n", + " \"\"\"Run multiple independent experiments and plot averages for different planning steps n.\"\"\"\n", + " env = DynaMazeEnv() # call the environment\n", + " ns = [\n", + " 0,\n", + " 5,\n", + " 50,\n", + " ] # These are different values of Dyna-Q planning step where 0 = pure Q-learning (no planning)\n", + " runs = 30 # runs: number of independent experimental runs (different seeds)\n", + " episodes = 50 # episodes: number of episodes per run\n", + " results = {n: [] for n in ns} # Storage dictionary, which creates a dictionary like\n", + "\n", + " for rep in range(runs): # Run experiments\n", + " seed = rep # TO DO # choose seeds, what is your stratgy to choose different seed ? Hint. for each rep, we need to choose different seed.\n", + " for (\n", + " n\n", + " ) in ns: # Inner loop: compare different planning values using the same seed\n", + " env = DynaMazeEnv() # call a new environment for this condition\n", + " results[n].append(\n", + " run_episodes(env, n, episodes, seed),\n", + " ) # run_episodes creates a new agent and runs 50 episodes, returns an array of length 50\n", + " # Append that array into results[n]\n", + "\n", + " avg = {\n", + " n: np.mean(results[n], axis=0) for n in ns\n", + " } # results[n] is a list of arrays ; axis=0 means average over runs, for each episode index.\n", + "\n", + " for n in ns: # Plots one curve per n with X-axis = episode number (0, ..., 49) and Y-axis = average steps to reach goal\n", + " plt.plot(avg[n], label=f\"n={n}\")\n", + "\n", + " plt.xlabel(\"Episodes\")\n", + " plt.ylabel(\"Steps per episode\")\n", + " plt.legend()\n", + " plt.show()\n", + "\n", + "\n", + "replicate_figure()\n" + ] + }, + { + "cell_type": "markdown", + "id": "22d14dd3", + "metadata": {}, + "source": [ + "**Exercise 4.** How and why does increasing the planning step `n` speed up learning?\n" + ] + }, + { + "cell_type": "markdown", + "id": "e763d124-f973-4c90-b23f-986cef3b7426", + "metadata": {}, + "source": [ + "More planning steps -> more simulated Q-updates per real step -> faster\n", + "\n", + "value propagation -> improved sample efficiency (fewer real interactions)." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "studies", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}