Exercise Status: All exercises complete and verified

1️⃣ Setting up our agent

Learning Objectives
  • Understand the difference between the actor & critic networks, and what their roles are
  • Learn about & implement generalised advantage estimation
  • Build a replay memory to store & sample experiences
  • Design an agent class to step through the environment & record experiences

In this section, we'll do the following:

  • Define a dataclass to hold our PPO arguments
  • Write functions to create our actor and critic networks (which will eventually be stored in our PPOAgent instance)
  • Write a function to do generalized advantage estimation (this will be necessary when computing our objective function during the learning phase)
  • Fill in our ReplayMemory class (for storing and sampling experiences)
  • Fill in our PPOAgent class (a wrapper around our networks and our replay memory, which will turn them into an agent)

As a reminder, we'll be continually referring back to The 37 Implementation Details of Proximal Policy Optimization as we go through these exercises. Most of our sections wil refer to one or more of these details.

PPO Arguments

Just like for DQN, we've provided you with a dataclass containing arguments for your train_ppo function. We've also given you a function from utils to display all these arguments (including which ones you've changed). Lots of these are the same as for the DQN dataclass.

Don't worry if these don't all make sense right now, they will by the end.

@dataclass
class PPOArgs:
    # Basic / global
    seed: int = 1
    env_id: str = "CartPole-v1"
    mode: Literal["classic-control", "atari", "mujoco"] = "classic-control"

    # Wandb / logging
    use_wandb: bool = False
    video_log_freq: int | None = None
    wandb_project_name: str = "PPOCartPole"
    wandb_entity: str = None

    # Duration of different phases
    total_timesteps: int = 500_000
    num_envs: int = 4
    num_steps_per_rollout: int = 128
    num_minibatches: int = 4
    batches_per_learning_phase: int = 4

    # Optimization hyperparameters
    lr: float = 2.5e-4
    max_grad_norm: float = 0.5

    # RL hyperparameters
    gamma: float = 0.99

    # PPO-specific hyperparameters
    gae_lambda: float = 0.95
    clip_coef: float = 0.2
    ent_coef: float = 0.01
    vf_coef: float = 0.25

    def __post_init__(self):
        self.batch_size = self.num_steps_per_rollout * self.num_envs

        assert self.batch_size % self.num_minibatches == 0, "batch_size must be divisible by num_minibatches"
        self.minibatch_size = self.batch_size // self.num_minibatches
        self.total_phases = self.total_timesteps // self.batch_size
        self.total_training_steps = self.total_phases * self.batches_per_learning_phase * self.num_minibatches

        self.video_save_path = section_dir / "videos"


args = PPOArgs(num_minibatches=2)  # changing this also changes minibatch_size and total_training_steps
arg_help(args)

A note on the num_envs argument - note that unlike yesterday, envs will actually have multiple instances of the environment inside (we did still have this argument yesterday but it was always set to 1). From the 37 implementation details of PPO post:

In this architecture, PPO first initializes a vectorized environment envs that runs $N$ (usually independent) environments either sequentially or in parallel by leveraging multi-processes. envs presents a synchronous interface that always outputs a batch of $N$ observations from $N$ environments, and it takes a batch of $N$ actions to step the $N$ environments. When calling next_obs = envs.reset(), next_obs gets a batch of $N$ initial observations (pronounced "next observation"). PPO also initializes an environment done flag variable next_done (pronounced "next done") to an $N$-length array of zeros, where its i-th element next_done[i] has values of 0 or 1 which corresponds to the $i$-th sub-environment being not done and done, respectively.

Actor-Critic Implementation (detail #2)

PPO requires two networks, an actor and a critic. The actor is the most important one; its job is to learn an optimal policy $\pi_\theta(a_t \mid s_t)$ (it does this by training on the clipped surrogate objective function, which is essentially a direct estimation of the discounted sum of future rewards with some extra bells and whistles thrown in). Estimating this also requires estimating the advantage function $A_\theta(s_t, a_t)$, which in requires estimating the values $V_\theta(s_t)$ - this is why we need a critic network, which learns $V_\theta(s_t)$ by minimizing the TD residual (in a similar way to how our Q-network learned the $Q(s_t, a_t)$ values).

Exercise - implement get_actor_and_critic

Difficulty: 🔴🔴⚪⚪⚪
Importance: 🔵🔵🔵⚪⚪
You should spend up to 10-20 minutes on this exercise.

You should implement the Agent class according to the diagram. We are doing separate Actor and Critic networks because detail #13 notes that is performs better than a single shared network in simple environments. Use layer_init to initialize each Linear, overriding the standard deviation argument std according to the diagram (when not specified, you should be using the default value of std=np.sqrt(2)).

We've given you a "high level function" get_actor_and_critic which calls one of three possible functions, depending on the mode argument. You'll implement the other two modes later. This is one way to keep our code modular.

def layer_init(layer: nn.Linear, std=np.sqrt(2), bias_const=0.0):
    t.nn.init.orthogonal_(layer.weight, std)
    t.nn.init.constant_(layer.bias, bias_const)
    return layer


def get_actor_and_critic(
    envs: gym.vector.SyncVectorEnv,
    mode: Literal["classic-control", "atari", "mujoco"] = "classic-control",
) -> tuple[nn.Module, nn.Module]:
    """
    Returns (actor, critic), the networks used for PPO, in one of 3 different modes.
    """
    assert mode in ["classic-control", "atari", "mujoco"]

    obs_shape = envs.single_observation_space.shape
    num_obs = np.array(obs_shape).prod()
    num_actions = (
        envs.single_action_space.n
        if isinstance(envs.single_action_space, gym.spaces.Discrete)
        else np.array(envs.single_action_space.shape).prod()
    )

    if mode == "classic-control":
        actor, critic = get_actor_and_critic_classic(num_obs, num_actions)
    if mode == "atari":
        actor, critic = get_actor_and_critic_atari(obs_shape, num_actions)  # you'll implement these later
    if mode == "mujoco":
        actor, critic = get_actor_and_critic_mujoco(num_obs, num_actions)  # you'll implement these later

    return actor.to(device), critic.to(device)


def get_actor_and_critic_classic(num_obs: int, num_actions: int):
    """
    Returns (actor, critic) in the "classic-control" case, according to diagram above.
    """
    raise NotImplementedError()


tests.test_get_actor_and_critic(get_actor_and_critic, mode="classic-control")
Question - what do you think is the benefit of using a small standard deviation for the last actor layer?

The purpose is to center the initial agent.actor logits around zero, in other words an approximately uniform distribution over all actions independent of the state. If you didn't do this, then your agent might get locked into a nearly-deterministic policy early on and find it difficult to train away from it.

Studies suggest this is one of the more important initialisation details, and performance is often harmed without it.

Solution
def layer_init(layer: nn.Linear, std=np.sqrt(2), bias_const=0.0):
    t.nn.init.orthogonal_(layer.weight, std)
    t.nn.init.constant_(layer.bias, bias_const)
    return layer


def get_actor_and_critic(
    envs: gym.vector.SyncVectorEnv,
    mode: Literal["classic-control", "atari", "mujoco"] = "classic-control",
) -> tuple[nn.Module, nn.Module]:
    """
    Returns (actor, critic), the networks used for PPO, in one of 3 different modes.
    """
    assert mode in ["classic-control", "atari", "mujoco"]

    obs_shape = envs.single_observation_space.shape
    num_obs = np.array(obs_shape).prod()
    num_actions = (
        envs.single_action_space.n
        if isinstance(envs.single_action_space, gym.spaces.Discrete)
        else np.array(envs.single_action_space.shape).prod()
    )

    if mode == "classic-control":
        actor, critic = get_actor_and_critic_classic(num_obs, num_actions)
    if mode == "atari":
        actor, critic = get_actor_and_critic_atari(obs_shape, num_actions)  # you'll implement these later
    if mode == "mujoco":
        actor, critic = get_actor_and_critic_mujoco(num_obs, num_actions)  # you'll implement these later

    return actor.to(device), critic.to(device)


def get_actor_and_critic_classic(num_obs: int, num_actions: int):
    """
    Returns (actor, critic) in the "classic-control" case, according to diagram above.
    """
    actor = nn.Sequential(
        layer_init(nn.Linear(num_obs, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, num_actions), std=0.01),
    )
    critic = nn.Sequential(
        layer_init(nn.Linear(num_obs, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, 1), std=1.0),
    )
    return actor, critic


tests.test_get_actor_and_critic(get_actor_and_critic, mode="classic-control")

Generalized Advantage Estimation (detail #5)

The advantage function $A_\theta(s_t, a_t)$ is defined as $Q_\theta(s_t, a_t) - V_\theta(s_t)$, i.e. it's the difference between expected future reward when we take action $a_t$ vs taking the expected action according to policy $\pi_\theta$ from that point onwards. It's an important part of our objective function, because it tells us whether we should try and take more or less of action $a_t$ in state $s_t$.

Our critic estimates $V_\theta(s_t)$, but how can we estimate the terms $Q_\theta(s_t, a_t)$ and by extension $A_\theta(s_t, a_t)$? Here are two ideas:

  1. $\hat{A}_\theta(s_t, a_t) = \delta_t := r_t + V_\theta(s_{t+1}) - V_\theta(s_t)$, i.e. the 1-step residual just like we used in DQN
  2. $\hat{A}_\theta(s_t, a_t) = \delta_t + \delta_{t+1} + ...$, i.e. the sum of all future residuals in subsequent states & actions in the trajectory

The problem with (1) is it's too myopic, because we're only looking 1 move ahead. If we sacrifice a chess piece to win the game 3 moves later, we want that to actually have positive advantage! (2) fixes this problem, but creates a new problem - the long time horizon estimates are very unstable, and so much of the trajectory is taken into account that it's hard to attribute any individual action. The solution is somewhere between these two - we perform generalized advantage estimation (GAE), which is a geometrically decaying sum of future residuals. It's controlled by the parameter $\lambda$, where $\lambda=0$ reduces to the first idea (single-step, myopic), and $\lambda=1$ to the second (full trajectories, unstable). This way, we can balance these two, and get the best of both world.

$$ \hat{A}^{\text{GAE}(\lambda)}_t=\delta_t+(\gamma \lambda) \delta_{t+1}+\cdots+\cdots+(\gamma \lambda)^{T-t+1} \delta_{T-1} \\ $$

Note a subtlety - we need to make sure $\delta_t$ is correctly defined as $r_t - V_\theta(s_t)$ in terminating states, i.e. we don't include the $V_\theta(s_{t+1})$ term. We can actually compute the GAE estimator (taking this into account) with the following recursive formula:

$$ \hat{A}^{\text{GAE}(\lambda)}_t = \delta_t + (1 - d_{t+1}) (\gamma \lambda) \hat{A}^{\text{GAE}(\lambda)}_{t+1} $$

Derivation (short)

If $d_{t+1}=1$ (i.e. we just terminated) then we'll get no further rewards, so our advantage on the final step of this trajectory is just $A_t = r_t - V(s_t) = \delta_t$ (since $V(s_t)$ was the estimate for future rewards before termination, and $r_t$ is the reward we actually get before terminating). So the formula above is correct for the terminal step.

Working backwards from the terminal step and applying this recursive formula, we get:

$$ \begin{aligned} \hat{A}^{\text{GAE}(\lambda)}_{t} &= \delta_{t} \\ \hat{A}^{\text{GAE}(\lambda)}_{t-1} &= \delta_{t-1} + (\gamma \lambda) \hat{A}^{\text{GAE}(\lambda)}_{t} = \delta_{t-1} + \gamma \lambda \delta_t \\ \hat{A}^{\text{GAE}(\lambda)}_{t-2} &= \delta_{t-2} + (\gamma \lambda) \hat{A}^{\text{GAE}(\lambda)}_{t-1} = \delta_{t-2} + \gamma \lambda \left(\delta_{t-1} + (\gamma\lambda) \delta_t\right) = \delta_{t-2} + \gamma \lambda \delta_{t-1} + (\gamma\lambda)^2 \delta_t \\ &\dots \end{aligned} $$
and so on. This exactly matches the formula given above.

Exercise - implement compute_advantages

Difficulty: 🔴🔴🔴🔴⚪
Importance: 🔵🔵🔵⚪⚪
You should spend up to 20-30 minutes on this exercise.

Below, you should fill in compute_advantages. We recommend using a reversed for loop over $t$ to get it working, and using the recursive formula for GAE given above - don't worry about trying to vectorize it.

Tip - make sure you understand what the indices are of the tensors you've been given! The tensors rewards, values and terminated contain $r_t$, $V(s_t)$ and $d_t$ respectively for all $t = 0, 1, ..., T-1$, and next_value, next_terminated are the values $V(s_T)$ and $d_T$ respectively (required for the calculation of the very last advantage $A_{T-1}$).

@t.inference_mode()
def compute_advantages(
    next_value: Float[Tensor, "num_envs"],
    next_terminated: Bool[Tensor, "num_envs"],
    rewards: Float[Tensor, "buffer_size num_envs"],
    values: Float[Tensor, "buffer_size num_envs"],
    terminated: Bool[Tensor, "buffer_size num_envs"],
    gamma: float,
    gae_lambda: float,
) -> Float[Tensor, "buffer_size num_envs"]:
    """
    Compute advantages using Generalized Advantage Estimation.
    """
    raise NotImplementedError()


tests.test_compute_advantages(compute_advantages)
Help - I get RuntimeError: Subtraction, the `-` operator, with a bool tensor is not supported

This is probably because you're trying to perform an operation on a boolean tensor terminated or next_terminated which was designed for floats. You can fix this by casting the boolean tensor to a float tensor.

Solution
@t.inference_mode()
def compute_advantages(
    next_value: Float[Tensor, "num_envs"],
    next_terminated: Bool[Tensor, "num_envs"],
    rewards: Float[Tensor, "buffer_size num_envs"],
    values: Float[Tensor, "buffer_size num_envs"],
    terminated: Bool[Tensor, "buffer_size num_envs"],
    gamma: float,
    gae_lambda: float,
) -> Float[Tensor, "buffer_size num_envs"]:
    """
    Compute advantages using Generalized Advantage Estimation.
    """
    T = values.shape[0]
    terminated = terminated.float()
    next_terminated = next_terminated.float()

    # Get tensors of V(s_{t+1}) and d_{t+1} for all t = 0, 1, ..., T-1
    next_values = t.concat([values[1:], next_value[None, :]])
    next_terminated = t.concat([terminated[1:], next_terminated[None, :]])

    # Compute deltas: \delta_t = r_t + (1 - d_{t+1}) \gamma V(s_{t+1}) - V(s_t)
    deltas = rewards + gamma * next_values * (1.0 - next_terminated) - values

    # Compute advantages using the recursive formula, starting with advantages[T-1] = deltas[T-1]
    # and working backwards.
    advantages = t.zeros_like(deltas)
    advantages[-1] = deltas[-1]
    for s in reversed(range(T - 1)):
        advantages[s] = deltas[s] + gamma * gae_lambda * (1.0 - terminated[s + 1]) * advantages[s + 1]

    return advantages

Replay Memory

Our replay memory has some similarities to the replay buffer from yesterday, as well as some important differences.

Sampling method

Yesterday, we continually updated our buffer and sliced off old data, and each time we called sample we'd take a randomly ordered subset of that data (with replacement).

With PPO, we alternate between rollout and learning phases. In rollout, we fill our replay memory entirely. In learning, we call get_minibatches to return the entire contents of the replay memory, but randomly shuffled and sorted into minibatches. In this way, we update on every experience, not just random samples. In fact, we'll update on each experience more than once, since we'll repeat the process of (generate minibatches, update on all of them) batches_per_learning_phase times during each learning phase.

New variables

We store some of the same variables as before - $(s_t, a_t, d_t)$, but with the addition of 3 new variables: the logprobs $\pi(a_t\mid s_t)$, the advantages $A_t$ and the returns. Explaining these two variables and why we need them:

  • logprobs are calculated from the logit outputs of our actor.agent network, corresponding to the actions $a_t$ which our agent actually chose.
    • These are necessary for calculating the clipped surrogate objective (see equation $(7)$ on page page 3 in the PPO Algorithms paper), which as we'll see later makes sure the agent isn't rewarded for changing its policy an excessive amount.
  • advantages are the terms $\hat{A}_t$, computed using our function compute_advantages from earlier.
    • Again, these are used in the calculation of the clipped surrogate objective.
  • returns are given by the formula returns = advantages + values - see detail #9.
    • They are used to train the value network, in a way which is equivalent to minimizing the TD residual loss used in DQN.

Don't worry if you don't understand all of this now, we'll get to all these variables later.

Exercise - implement minibatch_indices

Difficulty: 🔴🔴⚪⚪⚪
Importance: 🔵🔵⚪⚪⚪
You should spend up to 10-15 minutes on this exercise.

We'll start by implementing the get_minibatch_indices function, as described in detail #6. This will give us a list of length num_minibatches = batch_size // minibatch_size indices, each of length minibatch_size, and which collectively represent a permutation of the indices [0, 1, ..., batch_size - 1] where batch_size = num_minibatches * minibatch_size. To help visualize how this works to create our minibatches, we've included a diagram:

The test code below should also make it clear what your function should be returning.

def get_minibatch_indices(rng: Generator, batch_size: int, minibatch_size: int) -> list[np.ndarray]:
    """
    Return a list of length `num_minibatches`, where each element is an array of `minibatch_size` and the union of all
    the arrays is the set of indices [0, 1, ..., batch_size - 1] where `batch_size = num_steps_per_rollout * num_envs`.
    """
    assert batch_size % minibatch_size == 0
    raise NotImplementedError()


rng = np.random.default_rng(0)

batch_size = 12
minibatch_size = 6
# num_minibatches = batch_size // minibatch_size = 2

indices = get_minibatch_indices(rng, batch_size, minibatch_size)

assert isinstance(indices, list)
assert all(isinstance(x, np.ndarray) for x in indices)
assert np.array(indices).shape == (2, 6)
assert sorted(np.unique(indices)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
print("All tests in `test_minibatch_indexes` passed!")
Solution
def get_minibatch_indices(rng: Generator, batch_size: int, minibatch_size: int) -> list[np.ndarray]:
    """
    Return a list of length `num_minibatches`, where each element is an array of `minibatch_size` and the union of all
    the arrays is the set of indices [0, 1, ..., batch_size - 1] where `batch_size = num_steps_per_rollout * num_envs`.
    """
    assert batch_size % minibatch_size == 0
    num_minibatches = batch_size // minibatch_size
    indices = rng.permutation(batch_size).reshape(num_minibatches, minibatch_size)
    return list(indices)

ReplayMemory class

Next, we've given you the ReplayMemory class. This follows a very similar structure to the DQN equivalent ReplayBuffer yesterday, with a bit of added complexity. We'll highlight the key differences below:

  • There's no [-self.buffer_size:] slicing like there was in the DQN buffer yesterday. That's because rather than continually adding to our buffer and removing the oldest data, we'll iterate through a process of (fill entire memory, generate a bunch of minibatches from that memory and train on them, empty the memory, repeat).
  • The get_minibatches method computes the advantages and returns. This isn't really in line with the SoC (separation of concerns) principle, but this is the easiest place to compute them because we can't do it after we sample the minibatches.
  • A single learning phase involves creating num_minibatches = batch_size // minibatch_size minibatches and training on each of them, and then repeating this process batches_per_learning_phase times. So the total number of minibaches per learning phase is batches_per_learning_phase * num_minibatches.
Question - can you see why advantages can't be computed after we sample minibatches?

The samples are not in chronological order, they're shuffled. The formula for computing advantages required the data to be in chronological order.

@dataclass
class ReplayMinibatch:
    """
    Samples from the replay memory, converted to PyTorch for use in neural network training.

    Data is equivalent to (s_t, a_t, logpi(a_t|s_t), A_t, A_t + V(s_t), d_{t+1})
    """

    obs: Float[Tensor, " minibatch_size *obs_shape"]
    actions: Int[Tensor, " minibatch_size *action_shape"]
    logprobs: Float[Tensor, " minibatch_size"]
    advantages: Float[Tensor, " minibatch_size"]
    returns: Float[Tensor, " minibatch_size"]
    terminated: Bool[Tensor, " minibatch_size"]


class ReplayMemory:
    """
    Contains buffer; has a method to sample from it to return a ReplayMinibatch object.
    """

    rng: Generator
    obs: Float[Arr, " buffer_size num_envs *obs_shape"]
    actions: Int[Arr, " buffer_size num_envs *action_shape"]
    logprobs: Float[Arr, " buffer_size num_envs"]
    values: Float[Arr, " buffer_size num_envs"]
    rewards: Float[Arr, " buffer_size num_envs"]
    terminated: Bool[Arr, " buffer_size num_envs"]

    def __init__(
        self,
        num_envs: int,
        obs_shape: tuple,
        action_shape: tuple,
        batch_size: int,
        minibatch_size: int,
        batches_per_learning_phase: int,
        seed: int = 42,
    ):
        self.num_envs = num_envs
        self.obs_shape = obs_shape
        self.action_shape = action_shape
        self.batch_size = batch_size
        self.minibatch_size = minibatch_size
        self.batches_per_learning_phase = batches_per_learning_phase
        self.rng = np.random.default_rng(seed)
        self.reset()

    def reset(self):
        """Resets all stored experiences, ready for new ones to be added to memory."""
        self.obs = np.empty((0, self.num_envs, *self.obs_shape), dtype=np.float32)
        self.actions = np.empty((0, self.num_envs, *self.action_shape), dtype=np.int32)
        self.logprobs = np.empty((0, self.num_envs), dtype=np.float32)
        self.values = np.empty((0, self.num_envs), dtype=np.float32)
        self.rewards = np.empty((0, self.num_envs), dtype=np.float32)
        self.terminated = np.empty((0, self.num_envs), dtype=bool)

    def add(
        self,
        obs: Float[Arr, " num_envs *obs_shape"],
        actions: Int[Arr, " num_envs *action_shape"],
        logprobs: Float[Arr, " num_envs"],
        values: Float[Arr, " num_envs"],
        rewards: Float[Arr, " num_envs"],
        terminated: Bool[Arr, " num_envs"],
    ) -> None:
        """Add a batch of transitions to the replay memory."""
        # Check shapes & datatypes
        for data, expected_shape in zip(
            [obs, actions, logprobs, values, rewards, terminated],
            [self.obs_shape, self.action_shape, (), (), (), ()],
        ):
            assert isinstance(data, np.ndarray)
            assert data.shape == (self.num_envs, *expected_shape)

        # Add data to buffer (not slicing off old elements)
        self.obs = np.concatenate((self.obs, obs[None, :]))
        self.actions = np.concatenate((self.actions, actions[None, :]))
        self.logprobs = np.concatenate((self.logprobs, logprobs[None, :]))
        self.values = np.concatenate((self.values, values[None, :]))
        self.rewards = np.concatenate((self.rewards, rewards[None, :]))
        self.terminated = np.concatenate((self.terminated, terminated[None, :]))

    def get_minibatches(
        self, next_value: Tensor, next_terminated: Tensor, gamma: float, gae_lambda: float
    ) -> list[ReplayMinibatch]:
        """
        Returns a list of minibatches. Each minibatch has size `minibatch_size`, and the union over
        all minibatches is `batches_per_learning_phase` copies of the entire replay memory.
        """
        # Convert everything to tensors on the correct device
        obs, actions, logprobs, values, rewards, terminated = (
            t.tensor(x, device=device)
            for x in [
                self.obs,
                self.actions,
                self.logprobs,
                self.values,
                self.rewards,
                self.terminated,
            ]
        )

        # Compute advantages & returns
        advantages = compute_advantages(next_value, next_terminated, rewards, values, terminated, gamma, gae_lambda)
        returns = advantages + values

        # Return a list of minibatches
        minibatches = []
        for _ in range(self.batches_per_learning_phase):
            for indices in get_minibatch_indices(self.rng, self.batch_size, self.minibatch_size):
                minibatches.append(
                    ReplayMinibatch(
                        *[
                            data.flatten(0, 1)[indices]
                            for data in [obs, actions, logprobs, advantages, returns, terminated]
                        ]
                    )
                )

        # Reset memory (since we only need to call this method once per learning phase)
        self.reset()

        return minibatches

Like before, here's some code to generate and plot observations.

The first plot shows the current observations $s_t$ (with dotted lines indicating a terminated episode $d_{t+1} = 1$). The solid lines indicate the transition between different environments in envs (because unlike yesterday, we're actually using more than one environment in our SyncVectorEnv). There are batch_size = num_steps_per_rollout * num_envs = 128 * 2 = 256 observations in total, with 128 coming from each environment.

The second plot shows a single minibatch of sampled experiences from full memory. Each minibatch has size minibatch_size = 128, and minibatches contains in total batches_per_learning_phase * (batch_size // minibatch_size) = 2 * 2 = 4 minibatches.

Note that we don't need to worry about terminal observations here, because we're not actually logging next_obs (unlike DQN, this won't be part of our loss function).

num_steps_per_rollout = 128
num_envs = 2
batch_size = num_steps_per_rollout * num_envs  # 256

minibatch_size = 128
num_minibatches = batch_size // minibatch_size  # 2

batches_per_learning_phase = 2

envs = gym.vector.SyncVectorEnv([make_env("CartPole-v1", i, i, "test") for i in range(num_envs)])
memory = ReplayMemory(num_envs, (4,), (), batch_size, minibatch_size, batches_per_learning_phase)

logprobs = values = np.zeros(envs.num_envs)  # dummy values, just so we can see demo of plot
obs, _ = envs.reset()

for i in range(args.num_steps_per_rollout):
    # Choose random action, and take a step in the environment
    actions = envs.action_space.sample()
    next_obs, rewards, terminated, truncated, infos = envs.step(actions)

    # Add experience to memory
    memory.add(obs, actions, logprobs, values, rewards, terminated)
    obs = next_obs

plot_cartpole_obs_and_dones(
    memory.obs,
    memory.terminated,
    title="Current obs s<sub>t</sub><br>Dotted lines indicate d<sub>t+1</sub> = 1, solid lines are environment separators",
)

next_value = next_done = t.zeros(envs.num_envs).to(device)  # dummy values, just so we can see demo of plot
minibatches = memory.get_minibatches(next_value, next_done, gamma=0.99, gae_lambda=0.95)

plot_cartpole_obs_and_dones(
    minibatches[0].obs.cpu(),
    minibatches[0].terminated.cpu(),
    title="Current obs (sampled)<br>this is what gets fed into our model for training",
)
Click to see the expected output

PPO Agent

As the final task in this section, you should fill in the agent's play_step method. This is conceptually similar to what you did during DQN, but with a few key differences:

  • In DQN we selected actions based on our Q-network & an epsilon-greedy policy, but instead your actions will be generated directly from your actor network
  • Here, you'll have to compute the extra data logprobs and values, which we didn't have to deal with in DQN

Exercise - implement PPOAgent

Difficulty: 🔴🔴🔴🔴⚪
Importance: 🔵🔵🔵🔵⚪
You should spend up to 20-40 minutes on this exercise.

A few tips:

  • When sampling actions (and calculating logprobs), you might find torch.distributions.categorical.Categorical useful. If logits is a 2D tensor of shape (N, k) containing a batch of logit vectors and dist = Categorical(logits=logits), then:
    • actions = dist.sample() will give you a vector of N sampled actions (which will be integers in the range [0, k)),
    • logprobs = dist.log_prob(actions) will give you a vector of the N logprobs corresponding to the sampled actions
  • Make sure to use inference mode when using obs to compute logits and values, since all you're doing here is getting experiences for your memory - you aren't doing gradient descent based on these values.
  • Check the shape of your arrays when adding them to memory (the add method has lots of assert statements here to help you), and also make sure that they are arrays not tensors by calling .cpu().numpy() on them.
  • Remember to update self.next_obs and self.next_terminated at the end of the function!
class PPOAgent:
    critic: nn.Sequential
    actor: nn.Sequential

    def __init__(
        self,
        envs: gym.vector.SyncVectorEnv,
        actor: nn.Module,
        critic: nn.Module,
        memory: ReplayMemory,
    ):
        super().__init__()
        self.envs = envs
        self.actor = actor
        self.critic = critic
        self.memory = memory

        self.step = 0  # Tracking number of steps taken (across all environments)
        self.next_obs = t.tensor(envs.reset()[0], device=device, dtype=t.float)  # need starting obs (in tensor form)
        self.next_terminated = t.zeros(envs.num_envs, device=device, dtype=t.bool)  # need starting termination=False

    def play_step(self) -> list[dict]:
        """
        Carries out a single interaction step between the agent and the environment, and adds
        results to the replay memory.

        Returns the list of info dicts returned from `self.envs.step`.
        """
        # Get newest observations (i.e. where we're starting from)
        obs = self.next_obs
        terminated = self.next_terminated

        raise NotImplementedError()

        self.step += self.envs.num_envs
        return infos

    def get_minibatches(self, gamma: float, gae_lambda: float) -> list[ReplayMinibatch]:
        """
        Gets minibatches from the replay memory, and resets the memory
        """
        with t.inference_mode():
            next_value = self.critic(self.next_obs).flatten()
        minibatches = self.memory.get_minibatches(next_value, self.next_terminated, gamma, gae_lambda)
        self.memory.reset()
        return minibatches


tests.test_ppo_agent(PPOAgent)
Solution
class PPOAgent:
    critic: nn.Sequential
    actor: nn.Sequential

    def __init__(
        self,
        envs: gym.vector.SyncVectorEnv,
        actor: nn.Module,
        critic: nn.Module,
        memory: ReplayMemory,
    ):
        super().__init__()
        self.envs = envs
        self.actor = actor
        self.critic = critic
        self.memory = memory

        self.step = 0  # Tracking number of steps taken (across all environments)
        self.next_obs = t.tensor(envs.reset()[0], device=device, dtype=t.float)  # need starting obs (in tensor form)
        self.next_terminated = t.zeros(envs.num_envs, device=device, dtype=t.bool)  # need starting termination=False

    def play_step(self) -> list[dict]:
        """
        Carries out a single interaction step between the agent and the environment, and adds
        results to the replay memory.

        Returns the list of info dicts returned from `self.envs.step`.
        """
        # Get newest observations (i.e. where we're starting from)
        obs = self.next_obs
        terminated = self.next_terminated

        # Compute logits based on newest observation, and use it to get an action distribution we
        # sample from.
        with t.inference_mode():
            logits = self.actor(obs)
        dist = Categorical(logits=logits)
        actions = dist.sample()

        # Step environment based on the sampled action
        next_obs, rewards, next_terminated, next_truncated, infos = self.envs.step(actions.cpu().numpy())

        # Calculate logprobs and values, and add this all to replay memory
        logprobs = dist.log_prob(actions).cpu().numpy()
        with t.inference_mode():
            values = self.critic(obs).flatten().cpu().numpy()
        self.memory.add(
            obs.cpu().numpy(),
            actions.cpu().numpy(),
            logprobs,
            values,
            rewards,
            terminated.cpu().numpy(),
        )

        # Set next observation & termination state
        self.next_obs = t.from_numpy(next_obs).to(device, dtype=t.float)
        self.next_terminated = t.from_numpy(next_terminated).to(device, dtype=t.float)

        self.step += self.envs.num_envs
        return infos

    def get_minibatches(self, gamma: float, gae_lambda: float) -> list[ReplayMinibatch]:
        """
        Gets minibatches from the replay memory, and resets the memory
        """
        with t.inference_mode():
            next_value = self.critic(self.next_obs).flatten()
        minibatches = self.memory.get_minibatches(next_value, self.next_terminated, gamma, gae_lambda)
        self.memory.reset()
        return minibatches