30 min readAdvantage Actor-Critic with Proximal Policy Optimization: A Journey Through Code

Advantage Actor-Critic with Proximal Policy Optimization: A Journey Through Code

Whether you are a student in Georgia Tech's OMSCS CS 7642 Reinforcement Learning course or a curious learner, this article aims to discuss the low level code implementation of A2C w/ PPO through pseudocode.

This article is part of a series of articles for the OMSCS CS 7642 Reinforcement Learning class.

OMSCS Reinforcement Learning Series:

  1. Georgia Tech Reinforcement Learning: Preparing for Success
  2. Single Agent Reinforcement Learning: Basic Concepts and Terminologies
  3. Turbocharging Advantage Actor-Critic with Proximal Policy Optimization
  4. Advantage Actor-Critic with Proximal Policy Optimization: A Journey Through Code
  5. Multi-Agent Reinforcement Learning Soft Introduction: Cooperation

Friendly Caution

Hello dear reader! I'm thrilled that I get to share my knowledge. However, I am still a student, journeying through the complex field of computer science. While I strive to present information that is accurate and insightful, there may be instances where I misinterpret or oversimplify concepts. Please approach this article with a curious and critical mind and consult the learning material sections for the most accurate and comprehensive information available.

Learning Through Code

Inspired by the many great resources out there, I am a big believer of learning through the examples of other. For better examples of working code I have provided two resources that can help you understand and implement complete code on the Advantage Actor-Critic with Proximal Policy Optimization.

Learning Materials:

A2C with PPO - Experience Collection, Epochs, and Batch Updates

# pseudocode
iterations = 1000  # number of iterations to run agent training
K = 10  # number of epochs
N = 1  # number of workers
T = 400  # max time horizon for the trajectory
M = 10  # number of experiences inside a batch, M <= N*T
actor_nn, critic_nn = Actor_NeuralNetwork, Critic_NeuralNetwork
old_actor = copy.deepopy(actor_nn)
shared_experience_buffer = []

for _ in range(iterations):
	# make sure that all workers are using the same old_actor
	# important for the Importance Sampling calculation
	old_actor_nn = copy.deepopy(actor_nn)

	# imagine this running for each worker on their own CPU core
	# each worker is in charge of tracking their own simulation: agent positions, when to restart episode, etc
	# a single core method is to just keep collecting experiences until a max threshold in the shared_buffer has been reached that is N*T long
	for w in range(N):
		# 1. Run policy for T time steps
		# 2. Compute GAE using old actor network
		# 3. Calculate V-Target using critic network
		# experiences saved inside a shared buffer

	for epoch in range(K):
		# 4. Shuffle your experiences

		# collect minibatches of size M to update on
		for m in range(0, N*T, M)
			minibatch = shared_experience_buffer(m: m+M)

			# 5. Update Actor Network
				# 5a. Calculate Objective Surrogate Function using old_actor_nn and actor_nn
				# 5b. Loss = Objective + BH

			# 6. Update Critic Network 
				# 6a. Calculate V(s)
				# 6b. Loss = MSE(V-Target(s), V(s))

	# 7. Remove all experiences from shared buffer

Let's look at the general structure of what the code should look like.

Don't confuse agents with workers. Workers are just a simulated environment inside their isolated CPU core. An agent refers to the actual agent (or multiple agents) in the environment taking actions and receiving rewards.

Imagine each worker executing a simulation of the environment in their isolated core on the CPU. Each worker has their environment agent(s) perform actions based on the actor policy and save these experiences until a max timestep \(T_{threshold}\) threshold has been reached. Once each worker reaches that max timestep threshold \(T_{threshold}\), we can return that collection of experiences to the primary process for the batch updates.

The primary process collects and stores all experiences into a single shared buffer. This is ok because all workers have their agents making decisions on the same policy Neural Network, or rather, they all are using the same Actor Neural Network; there would be an issue if all agents were using different Neural Networks.

From there, the algorithm shuffles the batches on each epoch, a technique to help reduce variance.

From there, we update our actor and critic network.

The actor-network uses the surrogate objective function subtracted by the entropy regularization term \(\beta H(p, \pi)\).

It should be noted that we can collect experiences from a single worker. The main tradeoff is that although we collect "fuller" episodes, we do so slowly. On top of that literature in the field suggests that by updating from multiple agents, we can make use lots of "scenarios" quickly. Take for example T=10, in this case we can update different 10 step into the future scenarios from all of the workers, and therefore this update can be quickly used in the next 10 timesteps of the current episode.

I will note that 10 is too small; depending on the size problem you may need hundreds to thousands of collected timesteps to make meaningful updates.

Lastly, don't forget to remove everything from the shared experience buffer. Although the A2C with PPO algorithm allows for multiple updates on experiences from an older policy, once the epoch(s) portion is complete, we still need to collect new experiences from the newly updated actor Neural Network policy.

Using Multiple Processes

# Example of Multiple Processes
import multiprocessing as mp
from multiprocessing import Process, Queue, Manager, Lock
import random
import time

def worker(id, queue, run, shared_list, lock):
  increment = 0

  while True:
    if run.value:

      # random sleep event to simulate varying worker speed
      wait_time = random.randint(1, 5)
      time.sleep(wait_time)

      # lock/unlock ability to print to console
      lock.acquire()
      print(f"id: {id}, inc: {increment} -> add {shared_list[0]} to queue")
      lock.release()

      # send data up to main process and increment job count
      queue.put([shared_list[0]])
      increment += 1

def main():
  start_time = time.time()
  num_hello_worlds = 100
  message_count = 0
  hello_world_count = 0

  # variables used for processes, global sharing among processes
  num_workers = mp.cpu_count()
  processes = []
  queue = Queue()
  lock = Lock()
  manager = Manager()
  run = manager.Value("b", True)
  shared_list = manager.list()
  shared_list.append(0)

  print(f"num_workers: {num_workers}")

  # create individual processes based on CPU-core count
  for ind in range(num_workers):
    p = Process(target=worker, args=(ind, queue, run, shared_list, lock))
    p.start()
    processes.append(p)

  try:
    while hello_world_count < num_hello_worlds:
      run.value = True
      messages = []

      print("Start Simulation, Collect Messages from Queue")
      while len(messages) <= 10:
        message = queue.get()  # Wait for message from any worker
        messages.append(message)

      print("Collection Limit Reached!")
      hello_world_count += 1
      run.value = False

      print("Wait for other processes to finish simulation")
      p.join(6)  # wait 6 seconds for all processes to finish

      # grab leftover queue items
      while not queue.empty():
        message = queue.get()
        messages.append(message)

      # Execute Update -> Sleep for 5 seconds
      print("Execute Update -> Sleep")
      time.sleep(5)
      shared_list[0] += 1

      # clear the queue
      print("Clear Message Queue")
      while not queue.empty():
        queue.get()

  except KeyboardInterrupt:
    # clean up things when user presses Ctrl+C
    print("Terminating processes...")

  finally:
    # also runs when user presses Ctrl+C

    # remove processes, if skipped you'll have to manually remove in console
    for p in processes:
      p.terminate()
      p.join()

    # output total run time
    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

First up is multi-processing.

This example should run in python and is a "simple" hello world of passing information between workers and the main thread.

The code above uses the multiprocessing module.

The code is "simple. " The main thread creates multiple workers whose job it is to sleep for a random time of no more than 5 seconds and send back a message to the main thread through the use of a shared Queue. The main thread grabs these messages from the Queue and stores them in a local buffer.

When the buffer reaches a threshold of 10, then the workers are updated to no longer execute; done by update the boolean shared Manager Value run = False. Since some workers are still in the process of working, the main thread still has to wait. This implementation is up to you in regards to A2C w/ PPO. In the context of this "simple" example, we simply wait 6 seconds, since workers can sleep for no more than 5 seconds.

After the main thread can then run its update, which in this case is to sleep for 5 seconds. Once the main thread finishes its update, we need to clear the shared Queue of any missed messages.

With a fresh Queue, we can then update the boolean shared Manager Value run = True and the workers will then continue there work.

This is repeated until some iteration threshold has been reached.

In Python, the go-to for parallel code execution is the Multiprocessing module.

There are four key components to consider: Process, Queue, Manager, Lock.

The Process class provides us the main way to have multiple functions run in parallel. The going wisdom online is to only create as many processes as there are cores; which can be found through multiprocessing.cpu_count().

The Queue class allows us to have access to a global queue that workers and the main thread have access to. In our case only the main thread should be grabbing from this. This is how we can get experiences or things from workers. queue.put() allows us to insert items while queue.get() allows us to retrieve and remove items.

The Manager class is an important class as it gives us the ability to share items, list, values amongst all worker processes. You'll have to learn this if you want to share things like Neural Networks, potentially through the use of a shared array.

The Lock class can be shared amongst all workers and it allows us to have access to a mechanism that helps prevent "race conditions." A race condition is a situation where two workers want to do something important at the same time, but by doing those things at the same time, the main thread could potentially run into issues. In the use case of Lock only one worker is allowed to run code execution. First a worker must grab the shared lock through lock.acquire() and once code execution is complete the worker must then release that lock through lock.release(). In the event a worker cannot acquire a lock, then their execution is simply halted, each agent waits patiently until the lock is available to grab.

One thing about the concept of locks in the implementation of A2C w/ PPO is that if you plan on doing calculations inside of each worker, then a lock is not necessary as the information gets shuffled once the main thread starts doing the policy updates. However, if you plan on doing pre-calculations, such as v-targets and GAE in the main thread, then the ordering matters and therefore using the Lock class becomes necessary.

The last thing you have to do is make sure the main thread has a proper way of deleting processes in the event that the main thread has finished or if a user simply terminates execution through the use of Ctrl+C. If this is not done, the workers simply keep on working and eventually you'll run into problems that require you find these workers and manually close them through the terminal.

Also, it's important to note that you don't need to use any form of multiprocessing to implement A2C with PPO. Although A2C allows for multiple workers, the biggest factor is the use of the advantage calculations and the synchronous nature of the algorithm.

You could simply collect experiences in the main method until the T timestamp threshold has been reached, and then do all the calculations from that point on.

I did both, and while using multiple processes is faster, removing the use of multiprocessing simplifies the implementation and reduces the potential for errors in your algorithm. It basically, allows you to focus more on the algorithm details rather than worrying about the correctness of parallel computing.

It's important to note that in environments where rewards are scarce, we do not want to throw away any experiences as they may be important for updating. So consider how you retrieve experiences in your implementation. Sometimes its ok to throw away or clear items in the queue, other times it might be beneficial to grab all experiences we can.

Environment for Workers

# Pseudocode
import gym
env = gym.make('CartPole-v1')

# state and info of environment from gym library
curr_state, info = env.reset()

# experience buffer
local_experience_buffer = []

# Timestep collection of experiences
T = 400
t = 0

# TODO: modify based on learning algorithm used
def action_selection_strategy(curr_state):
	return env.action_space.sample()

# TODO: modify 
def send_info_to_main_thread():
	# Lock Acquire
	# 1. Calculate the Advantages 
	# 2. Calculate the v-Targets
	# 3. Send Experience Buffer, Advantages, v-Targets to main process
	# 4. Clear Local Experience Buffer
	# Lock Release

# Main process determines when worker terminates
while True:
	terminated, truncated = False, False
	curr_state, info = env.reset()

	while not terminated and not truncated:
		if t >= T:
			send_info_to_main_thread()
			t = 0

		# decide the action of the agent
		# example only picks a random action
		agent_action = action_selection_strategy(curr_state)

	    next_state, reward, terminated, truncated, info = env.step(agent_action)

		# what you save per timestep is up to you
		local_experience_buffer.append(curr_state, agent_action, reward, next_state, terminated, truncated)

		# check if agent has found the goal state or time limit has been reached
	    if terminated or truncated: break

		# Keep track of agents current state, forgetting leads to pain!!!
		curr_state = next_state
		t += 1

env.close()

This is a straightforward environment of the gym library; it's also a modified version of the original code that can be found in Gyms Basic Usage Tutorial.

There is a flow to model-free reinforcement learning algorithms.

The agent starts in a specific state and keeps track of its current state in the world. It selects an action based on some strategy. From this state-action pair, the agent "experiences" the world by receiving a reward and transitioning to a new state.

Depending on the learning algorithm chosen, the agent can either learn from the experience immediately or save that experience in some type of buffer.

The agent then updates its current state and repeats the entire learning sequence until it reaches the terminal state.

In the case of PPO, this example has the worker perform the calculations for advantages and v-targets.

Note that you can also collect experiences in the main thread, which would be equivalent to having one worker. The main goal is to collect experiences, calculate the advantages, and then run multiple epoch updates for both the Actor and Critic neural networks.

I guarantee that no matter the type of implementation, whether single-core or parallel multi-core, the algorithm will still learn, although in the case of single-core, it will be slower.

A2C Actor and Critic Neural Networks

The following code presented in this section is just modified code found in the pytorch documentation. Please learn from it, but don't copy the hyperparameters or code structure, as they are not flexible for large problems. I'll go over explanations for both the actor and critic algorithms.

Keep in mind that Pytorch works with tensors, which is similar to Numpy. If you are used to working in Numpy and there is a function in Numpy that gets what you need to be done, then Pytorch most likely has a similar function.

Lastly, pay attention to your hyperparameter choices, such as the learning rate and the neural network size, as they can tell you more about the environment and the problem at hand. In the case of neural networks, a small neural network tells you that the environment is simple, while a more extensive neural network tells you that the problem has many complicated patterns. The size of the learning rate can tell you a lot about the algorithm choices. For example, a small learning rate leading to success may tell you that the samples you are collecting are low quality or that the algorithmic update on the neural network is mainly unstable. A larger learning rate working well may mean that the experiences you collect are more meaningful or that the algorithm of choice is making better, high-quality updates.

In summary, the problem you are trying to solve will determine all the hyperparameter choices for the A2C Neural Networks. Hyperparameter discovery is usually done either through a trial-and-error strategy or a systematic approach such as a grid or random search.

Actor Neural Network

import torch
from torch import nn
import torch.optim as optim

class Actor_NeuralNetwork(nn.Module):
    def __init__(self, state_space, action_space):
        super().__init__()

		# create NN model
        self.linear_relu_softmax_nn = nn.Sequential(
            nn.Linear(state_space, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, action_space),
            nn.Softmax(dim=-1),
        )

        # define optimizer with learning rate
        # used to update NN parameters with the Adam optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.01)

    def forward(self, curr_state):
        logits = self.linear_relu_softmax_nn(curr_state)
        return logits

	def action_selection(curr_state):
		action_probabilities = self.forward(curr_state)
		return torch.distributions.categorical.Categorical(probs=action_probabilities).sample().item()

	def backpropagate(self, objective, beta, entropy):
		loss = -(objective + (beta * entropy))
		loss.backward()

		# reset gradients and update hyperparameters based on loss function
		self.optimizer.zero_grad()
		self.optimizer.step()

The Actor_NeuralNetwork class initializes from the parent nn.Module module, which contains all the PyTorch functionality.

There are several important aspects to consider for the actor neural network, so let's get into it.

First, we need to decide on an activation function. Although the code uses a ReLU activation function, academic papers often recommend Tanh as the preferred activation function for A2C with PPO.

Second, we use the state space size for the input layer and the action space size for the output layer. This allows the actor neural network to map states to actions.

Another key aspect is using Softmax(dim=-1) as the final activation function. This turns the last layer into a probability distribution where all nodes sum up to 1, providing probability outputs for all of the agent's actions.

When the neural network is first created, all nodes in the last layer have values close to uniform. This means that at the beginning of training, all actions are equally likely to be selected until the algorithm learns more.

Pay attention to the loss function -(objective + (beta * entropy)) and the use of the negative sign. The clipped surrogate objective function is a maximization technique. However, since we are using stochastic gradient descent or a similar method, we need to convert the maximization function into a minimization function by negating the number.

Lastly, consider how we pick the action. We send the state information to the neural network and get back the probability of selecting each action in a tensor array. From that tensor array, we use a function from PyTorch's categorical distribution to sample a single index based on their probability values using sample(), and return that as a regular Python integer using item().

Remember that neither the learning rate in this code nor the layer depth and width of the neural network will solve the CartPole problem. This code must be modified, but it provides a valuable learning experience nonetheless.

Critic Neural Network

import torch
from torch import nn
import torch.optim as optim

class Critic_NeuralNetwork(nn.Module):
    def __init__(self, state_space):
        super().__init__()

		# create NN model
        self.linear_relu_nn = nn.Sequential(
            nn.Linear(state_space, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 1),
        )

		# define the loss function
		self.criterion = nn.MSELoss()

        # define optimizer with learning rate
        # used to update NN parameters with the Adam optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.01)

    def forward(self, curr_state):
        logits = self.linear_relu_nn(curr_state)
        return logits

	def backpropagate(self, vtarget, vcurr):
		loss = self.criterion(vcurr, vtarget)
		loss.backward()

		# reset gradients and update hyperparameters based on loss function
		self.optimizer.zero_grad()
		self.optimizer.step()

It is very similar to the actor neural network; the main differences between the two are the output layer of the neural network and the calculation done during the backpropagation step.

First, let's look at the neural network. Notice that we take an input state and output a single value; this represents our mapping from state to state-value function.

Second, consider the backpropagation method. Here, we calculate the loss function for both the state-value function and the modified state-value target value. In this case, we use the Mean Squared Error (MSE) loss function. However, literature in the field suggests using the Huber Loss as it is considered a "middle ground" between MSE and the Mean Absolute Error (MAE).

Third, the activation function is important. Academic papers often prefer using Tanh as the activation function for A2C with PPO.

Lastly, we update the neural network's parameters on every update. If you'd like to accumulate the errors, omit the optimizer.zero_grad() and optimizer.step() calls until you are ready to perform the update. A better guide to this technique can be found here.

Generalized Advantage Function Code

#pseudocode
# main loop variable declarations
experience_buffer = [curr_state, action, reward, next_state, is_next_terminal]
critic_nn = Critic_NeuralNetwork()

def forward_gae_calculation():
	advantage_buffer = []
	_gamma, _lambda = .99, .99

	for i, _ in enumerate(experience_buffer)
		adv_range_buffer = advantage_buffer[i:]
		running_sum = 0.

		for index, item in enumerate(adv_range_buffer):
			curr_state, action, reward, next_state, is_next_terminal = item

			temporal_difference = reward + ((1 - is_next_terminal) * _gamma * critic_nn(next_state)) - critic_nn(state)
			running_sum += ((_gamma * _lambda) ** index) * temporal_difference

			# don't do calculations for a different episode
			if is_next_terminal: break

		advantage_buffer.append(running_sum)

	return advantage_buffer

def backward_gae_calculation():
	advantage_buffer = []
	_gamma, _lambda = .99, .99
	running_sum = 0.

	for index, item in enumerate(reversed(experience_buffer)):
		curr_state, action, reward, next_state, is_next_terminal = item

		temporal_difference = reward + ((1 - is_next_terminal) * _gamma * critic_nn(next_state)) - critic_nn(state)

		running_sum += temporal_difference + ((_gamma * _lambda * (1 - is_next_terminal)) * running_sum)

		advantage_buffer.append(running_sum)

	return advantage_buffer

I showcase both the forward and backward calculations of GAE for learning purposes.

The forward calculation has a time complexity equivalent to \(O(n^2)\), while the backward calculation method can be done in \(O(n)\). I recommend using the backward calculation.

The functions assume all workers are calculating GAE for their collection of experiences after the timestep \(T\) threshold has been reached. Also, remember that you can still collect experiences after an episode terminates, so you have to account for that in both the forward and backward GAE calculations.

Entropy Regularization (Shannons Entropy)


# main-loop variable declaration
actor_nn = Actor_NeuralNetwork

def entropy_regularization(states):
	beta = 0.01
	# returns tensor array of probability of actions
	logits = actor_nn(state).detach().numpy()
	# shannons entropy
	entropy = -np.sum(logits * np.log2(logits), axis=1, keepdims=True)

	return beta * entropy

This is a straightforward calculation of Shannon's Entropy. The use of ⁡\(log_{2}\) keeps the information in bits and emphasizes the binary nature of the calculation. Keep in mind that since ⁡\(log_{2}\) is used, the more actions the agent has, the higher the reward bonus for high entropy will be, where \(bonus \in [0, log_{2}(A)]\) and \(A\) is the set of actions belonging to the agent.

Understanding entropy regularization involves knowing the formula used to calculate entropy and the range of possible values. It also means understanding what the entropy calculation signifies in terms of how the entropy determines the distribution of the bonus value for the agent in unknown states.

Clipped Surrogate Function

#pseudocode
# main-loop variable declaration
minibatch_experience_buffer = [state, action, reward, next_state, is_next_terminal, is_truncated]
minibatch_advantage_buffer = [advantage_for_state_action_pair]

# calculation done on all minibatch experiences
def clipped_surrogate():
	_epsilon = .2
	pessimistic_updates = []

	for index, item in enumerate(minibatch_experience_buffer)
		state, action, _, _, _, _ = item
		adv = minibatch_advantage_buffer[index]

		importance_sample = actor_nn(state)[action] / old_actor_nn(state)[action]

		clipped = torch.clip(importance_sample, 1 - _epsilon, 1 + _epsilon) * adv
		unclipped = importance_sample * adv

		pessimistic_updates.append(np.min(clipped, unclipped))

	return pessimistic_updates

The concept of the clipped surrogate function is simple.

First calculate the importance sampling through the use of the original actor NN that made the policy selection and the updated actor NN from epoch updates. Using this importance sampling ratio, we then apply the clipped objective function to ensure that the policy update is not too large, which stabilizes training.

The aim is to create a pessimistic, or conservative update by comparing the unclipped and clipped versions of the importance sampling ratio multiplied by the advantage and selecting the one with the least impact on the update. This approach ensures that the update is within a specified range, preventing overly large updates that could destabilize the learning process.

Speeding Up Code Through Vectorization

Throughout all examples, I used a simple buffer to collect and utilize experiences. However, you can speed up your code by utilizing a concept in Python called vectorization.

The idea is that through the use of NumPy or PyTorch tensors, it's possible to improve the speed of mathematical calculations.

By vectorizing your code correctly, many of the for loops seen in the examples can be removed.

For a better example of how to vectorize A2C with PPO code, I suggest looking at the coding examples provided in the book Grokking Deep Reinforcement Learning by Miguel Morales.

Conclusion

I hope this article has helped you understand A2C with PPO a little better. In the next article I go over a soft introduction to multi-agent reinforcement learning.

OMSCS Reinforcement Learning Series:

  1. Georgia Tech Reinforcement Learning: Preparing for Success
  2. Single Agent Reinforcement Learning: Basic Concepts and Terminologies
  3. Turbocharging Advantage Actor-Critic with Proximal Policy Optimization
  4. Advantage Actor-Critic with Proximal Policy Optimization: A Journey Through Code
  5. Multi-Agent Reinforcement Learning Soft Introduction: Cooperation

Hello! I'm just a person who wants to help others on their programming journey.

Godot Tutorials

Student