In the previous article we learned the theory and basic implementation of one of the most popular reinforcement learning algorithms – Q-learning. Despite the obvious advantages of the classic approach with the Q matrix, it is easy to notice one small detail that is clearly a disadvantage. The size of the Q matrix depends purely on the number of states and available actions, which for complex environments results in enormous matrix size, the demand for high computing power and long training times. Today we will try to solve this problem.

NOTE!

Before starting this article, please read the previous one where we explain the basics of how Q-learning works. Link: https://wizzdev.com/blog/how-to-implement-q-learning-for-gymnasium/

The plan of the article is as follows:

- Going deeper,
- Loss function,
- Improvements techniques,
- Learning loop,
- Python code implementation,
- Summary.

**Going deeper**

As we’ve already established, sometimes mapping to tables is simply too inefficient or impossible. Therefore, we need a tool that will be perfect at estimating the value of the Q function, will take up less memory space and will be computationally efficient. We need to go deeper than the matrix. The answer (as the name suggests) is in deep neural networks!

Let’s go back to the classic definition of a neural network and what are they the best at – neural networks estimate the probability distribution of a given variable. Thanks to this property we are able to train our model to approximate even complex and multidimensional functions. It is easy to see that this is a remedy for our problem!

Adding a neural network model to our algorithm resolves the problem of representing the space of possible values of the Q function. The network makes predictions of the value of the Q function for each possible n actions based on state, in the end we only execute the action with the highest predicted value of the Q function.

Source: https://www.researchgate.net/figure/Q-Learning-vs-Deep-Q-Learning_fig1_351884746

**Loss function**

However, the question arises: how to train such a neural network model? Let’s use what we already know. Generalizing we train neural networks, by minimizing a certain cost function with help of gradient methods. From the previous article we know that we can express a relationship between current and the next state with the help of **Bellman’s equation**:

Q(s, a) = r + γ * maxa’(s’, a’)

which means that the full maximum reward for a given state (*s*) is the current reward (*r*) and maximum future reward for the next state (*γ * max**a’**(s’, a’)*). If we write the learning task for a neural network as a classic minimization of the mean square difference between the expected and predicted value (output of the model) we would obtain popularly called Mean Square Error loss function expressed as:

loss = ½ * [r + γ * maxa’(s’, a’) – Q(s, a)]2

And that’s basically it! We have defined a concept of neural network model to allow the algorithm to handle complex, high-dimensional state spaces and make it suitable for complex tasks such as controlling robots. Is that all and we can start implementing the solution? Well let’s slow down for a moment and discuss what the learning process looks like and what improvements we can make.

**Learning techniques**

We will introduce two more (actually three, but we already know one from the previous article) improvements to our algorithm before implementing it in Python. Firstly let’s introduce a concept of **experience replay**. As in every task of training a neural network, local minimums are a big problem. In the Deep Q-learning algorithm we can break the cause and effect relationships by storing experience in a buffer instead of updating a Q NN model every new experience. Throughout the training process for each step we store a vector in a buffer containing *state, action, reward* and *next state*. During the backpropagation step we don’t use data for the current step data but randomly selected samples (batch) from the replay memory. This way selected samples are not correlated with each other as preceding samples from history. This makes learning more stable and resilient for stucking in local minimum.

Source: https://wpumacay.github.io/research_blog/posts/deeprlnd-project1-part1/

Secondly, we will use something we already know: **exploration – exploitation **balance otherwise called **Multi-armed bandit problem. **Referring to the previous article we will use the ε-greedy strategy approach.

NOTE!

For a full explanation of this topic refer to the previous article in this series.

And finally we conclude that great solutions require complex approaches so we are going to use **two neural networks** instead of one! We need to deal with the problem of moving targets during training, this has to do with adding context to a given state. We will solve this challenge by adding a second, exactly the same network, but delayed, to the main one. Let’s call these networks **Q****Target** and** Q****Policy**, where QTarget will be a delayed copy of QPolicy. Periodically, every certain number of steps, we will synchronize the networks with each other, this way we will provide a constant target for the Q value.

**Learning loop**

We already know what changes we need to make compared to the classic approach and what additions and improvers we should add. Let’s now put all the puzzles together and create a training loop. This will be the last step before implementing our solution. Full training loop presents as follows:

NOTE!

Try to analyze the diagram and write out all the steps in order, paying attention to which steps will be in a loop. Don’t worry if not everything is clear now, code implementation analysis will help you understand it.

**Python code implementation**

Let’s start by discussing the task we will try to solve today. Just like last time, we will choose an environment from the Python Gymnasium library. We decided to choose today’s example for climbing fans – we will try to optimize the path on the cliff while being careful not to fall into the abyss.

Link to today’s environment: https://gymnasium.farama.org/environments/toy_text/cliff_walking/.

Analyzing documentation we can see that we have 4 possible moves and 48 possible states. The way of representing states is quite interesting, the same notation is used to represent 2D occupancy grid maps in robotics:

*index = current_row * number_of_rows + current_column*

Analyzing the documentation even further we can notice a certain problem with reward. Until now we assumed that the episode ends when we either get to the goal and get reward or we will make a move that will give us big enough punishment – slipping off the cliff. In the considered example we can read that the episode ends only when we reach the goal (state 47), it does not terminate when we will slip off the cliff, we will only get a punishment of -100 points. It conflicts with our assumptions, but don’t worry, at this point we can call ourselves machine learning developers and we will handle this problem. While working with neural networks it is also important to remember that processing large values at the input is not always a good approach, we have less control usually resulting in longer training times and the network can be less effective. That’s why we usually normalize input data, in our example each step incurs -1 reward, unless the player stepped into the cliff, which incurs -100 reward, that’s quite a big discrepancy. This is also something we will have to address later. We have analyzed the entire problem and come to a fundamental conclusion: often, before we start implementing a solution, we need to deeply examine the problem and create a bridge that matches our assumptions to what we have access to from the environment.

Create new project and install needed packages:

numpy==2.0.0

gymnasium==0.29.1

pygame==2.6.0

tqdm==4.66.4

matplotlib==3.9.1

torch==2.3.1

torchvision==0.18.1

We know the theoretical intro, environment and what to watch out for. Let’s start coding with something simple at first.

Model implementation:

import torch

from torch import nn

class DQN(nn.Module):

“””

Deep Q-learning Neural Network model.

“””

def __init__(self, in_states: int, out_actions: int):

super().__init__()

# Constants

self.L1_NODES = 48

# Activation functions

self.relu = nn.ReLU()

# Layers

self.block_1 = nn.Linear(in_features=in_states, out_features=self.L1_NODES)

self.block_2 = nn.Linear(in_features=self.L1_NODES, out_features=out_actions)

def forward(self, x: torch.Tensor) -> torch.Tensor:

x = self.block_1(x)

x = self.relu(x)

x = self.block_2(x)

return x

There is no simple and one good answer to the question of which model to use, the degree of complexity and sophistication depends on the task, in the solved case we will use a very simple network consisting of two fully connected layers with one ReLU activation function. If you don’t have a powerful GPU to train NN models don’t worry this example is so simple we will use only CPU. Implementation is the classic creation of your own model using the PyTorch library, where, in addition to the layer declaration, we also need to implement the forward function. Let’s save this as *nn_model.py.*

Next let’s focus on replay memory implementation.

from collections import deque

import random

class ReplayMemory:

“””

Memory for Experience Replay

“””

def __init__(self, max_len: int):

self.memory = deque([], maxlen=max_len)

def append(self, transition):

self.memory.append(transition)

def sample(self, sample_size: int = 1):

# Return k unique random elements

return random.sample(self.memory, sample_size)

def __len__(self):

return len(self.memory)

We could use a simple table for this, however, we decided on something much more sophisticated – *deque* which allows us to append data from both ends. What is noteworthy about this class is an additional method that allows you to return random k samples. Place this class in *replay_memory.py.*

And now the moment we’ve all been waiting for – main class implementation. We will discuss it method by method.

class CliffWalkingDQN:

“””

Cliff Walking Deep Q-learning implememation

“””

def __init__(self):

# Hyperparameters

self.learning_rate = 0.001

self.discount_factor = 0.9

self.network_sync_rate = 10

self.replay_memory_size = 1000

self.mini_batch_size = 32

Firstly we create a class to handle the entire training and testing process and in its constructor declare all training variables. Nothing complex here, we know *learning rate *and *discount factor *from previous article, *network_sync_rate *simply indicates the number of steps the agent takes before syncing the policy and target network, *replay_memory_size *as name suggest is the buffer size for replay memory and *mini_batch_size* tells us how many samples we are going to randomly select from replay memory. There are two main methods in this class that we are going to need to implement: *train* and *test*.

Starting with *train:*

def train(self, episodes: int = 1000, render: bool = False):

# Prepare environment

env = gym.make(“CliffWalking-v0”, render_mode=“human” if render else None)

num_states = env.observation_space.n

num_actions = env.action_space.n

epsilon = 1 # exploration_factor – 1 = 100% random actions, explore at the beginning

memory = ReplayMemory(self.replay_memory_size)

# Create policy and target neural networks

policy_dqn = DQN(in_states=num_states, out_actions=num_actions)

target_dqn = DQN(in_states=num_states, out_actions=num_actions)

# Be sure that both models have the same weights

target_dqn.load_state_dict(policy_dqn.state_dict())

# Training components

self.loss_fn = nn.MSELoss()

self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate)

rewards_per_episode = np.zeros(episodes)

epsilon_history = []

reward = None

step_count = 0

Skipping the environment declaration, we create two NN models, where input layers have a size of number of states and output number of available actions, we initially synchronize them together by copying values of weights from policy network to target network. Training those models is similar to a classification task, we initialize the mean square error loss function and Adam optimizer. Additionally we create a few help variables.

Now to the main training loop:

print(“— Training —“)

for i in tqdm(range(episodes)):

state = env.reset()[0] # Reset the environment

terminated = False

truncated = False

while not terminated and not truncated:

# Select action based on epsilon-greedy

if random.random() < epsilon:

# Select random action

action = env.action_space.sample()

else:

# Select best action

with torch.no_grad():

action = policy_dqn(state_to_dqn_input(state, num_states)).argmax().item()

# Execute action

new_state, action_reward, terminated, _, _ = env.step(action)

# Process reward and observations

terminated, reward = self._process_epizode(action_reward, new_state)

# Save experience into memory

memory.append((state, action, new_state, reward, terminated))

# Move to the next state

state = new_state

# Increment step counter

step_count += 1

# Keep track of the rewards collected per episode.

if reward == 1:

rewards_per_episode[i] = 1

# Check if enough experience has been collected and if at least 1 reward has been collected

if len(memory) > self.mini_batch_size and np.sum(rewards_per_episode) > 0:

mini_batch = memory.sample(self.mini_batch_size)

self._optimize(num_states, mini_batch, policy_dqn, target_dqn)

# Decay exploration_factor

epsilon = max(epsilon – (1 / episodes), 0)

epsilon_history.append(epsilon)

# Synchronize policy network to target network after a certain number of steps

if step_count > self.network_sync_rate: target_dqn.load_state_dict(policy_dqn.state_dict())

step_count = 0

Until the action is executed, everything looks familiar, it is even identical to what we did in the previous article. Interesting things start to happen only later. Let’s go back for a moment to the beginning of this chapter, where we said that we had to process the reward and state, we implement this functionality in helper method:

def _process_epizode(self, reward: int, new_state: int) -> List[Union[bool, int]]:

terminated = False

output_reward = 0

if new_state == 47:

# Reached the goal! Episode ends

terminated = True

output_reward = 1

elif reward == -100:

# Slipped off the cliff – end of episode

terminated = True

output_reward = 0

return [terminated, output_reward]

Manually based on reward and new state we change output of terminated and reward. Simple bunch of if statements.

After processing these values we store them in the experience replay buffer and as usual we move on to the next iteration as long as the simulation is not terminated. The last piece of the puzzle is training itself. After we collect enough experience and we have earned at least one reward we can move to back propagation. This functionality was implemented in another helper function *_optimize*:

def _optimize(self, num_states: int, mini_batch: List, policy_dqn: DQN, target_dqn: DQN):

# Prepare placeholders for target and actual q values

current_q_list = []

target_q_list = []

# Iterate over saved memory

for state, action, new_state, reward, terminated in mini_batch:

if terminated:

# Agent either reached goal (reward = 1) or slipped off the cliff (reward = 0)

# Target q value should be set to the reward.

target = torch.tensor([reward])

else:

# The agent neither fell nor reached his goal

with torch.no_grad():

target = (

torch.tensor(reward)

+ self.discount_factor * target_dqn(state_to_dqn_input(new_state, num_states)).max())

# Get the current set of Q values

current_q = policy_dqn(state_to_dqn_input(state, num_states))

current_q_list.append(current_q)

# Get the target set of Q values

target_q = target_dqn(state_to_dqn_input(state, num_states))

target_q_list.append(target_q)

# Associate action with target

target_q[action] = target

# — Back propagation —

# Compute loss for the whole minibatch

loss = self.loss_fn(torch.stack(current_q_list), torch.stack(target_q_list))

# Optimize the model

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

Last step is to synchronize the policy network to the target network after a certain number of steps. After the training is ended we additionally save the policy networks weights and that’s basically it for the training part.

NOTE!

Take a moment to analyze our implementation and compare it to the diagram from the Learning loop chapter.

Last part – test method:

def test(self, episodes: int = 100):

# Prepare environment

env = gym.make(“CliffWalking-v0”, render_mode=“human”)

num_states = env.observation_space.n

num_actions = env.action_space.n

# Load learned policy

policy_dqn = DQN(in_states=num_states, out_actions=num_actions)

policy_dqn.load_state_dict(torch.load(“cliff_walking_dql.pt”))

policy_dqn.eval() # switch model to evaluation mode

num_success = 0

for _ in tqdm(range(episodes)):

state = env.reset()[0] # Initialize to state 0

terminated = False

truncated = False

step_count = 0

print(“— Test —“)

while not terminated and not truncated:

# Select best action

with torch.no_grad():

action = policy_dqn(state_to_dqn_input(state, num_states)).argmax().item()

# Execute action

state, reward, terminated, truncated, _ = env.step(action)

step_count += 1

if reward == -100:

# Agent slipped off the cliff

break

if state == 47:

# Agent reached goal!

num_success += 1

break

if step_count > 50:

# Agent did to many moves

break

env.close()

print(f”Success rate: {num_success / episodes * 100} %“)

Let’s place it all in the *cliff_walking_dqn.py* file and try to run it. Full implementation is available on our Github repository: https://github.com/wizzdev-pl/Q-learning-blog-post . Please check out the full implementation (with imports and main function) of this file there.

**Result:**

We have successfully trained our model! As we can clearly see, the agent does not fall off the cliff and choose the most optimal path to the target.

**Summary**

Deep Q-learning is an advanced reinforcement learning algorithm that integrates deep learning with Q-learning. In this article, we solved the basic problem of the classical implementation of the algorithm – usage of environments with limited action spaces and actions because of huge Q matrix sizes. We learned how to replace the Q matrix with a neural network, what other improvements we can add for training and how to adapt different environments for our solution. We encourage you to try implementing Deep Q-learning algorithm for other environments in Gymansium library!

**External links:**