Exercise Status: All exercises complete and verified

5️⃣ Mujoco

Learning Objectives
  • Understand how PPO can be used to train agents in continuous action spaces
  • Install and interact with the MuJoCo physics engine
  • Train an agent to solve the Hopper environment

An important note - mujoco environments are notoriously demanding when it comes to having exactly the right library installs and versions. For one thing, they require Python version <3.11, which means they currently won't work in Colab until a fix for this problem is found. To get them working, you'll need to go through the process of creating a new virtual environment and installing Python version 3.10 - we recommend creating a Linux-based via the instructions in the Streamlit homepage, and swapping python=3.11 for python=3.10 when you go through the "Workspace setup instructions" code.

Installation & Rendering

Once you've gone through the step described above (a new virtual env with Python version 3.10), you should re-run all the imports up to this point in the file, as well as the following code which will install all the Mujoco-specific packages and depenedencies:

%pip install mujoco free-mujoco-py

!sudo apt-get install -y libgl1-mesa-dev libgl1-mesa-glx libglew-dev libosmesa6-dev software-properties-common
!sudo apt-get install -y patchelf

To test that this works, run the following. The first time you run this, it might take about 1-2 minutes, and throw up several warnings and messages. But the cell should still run without raising an exception, and all subsequent times you run it, it should be a lot faster (with no error messages).

env = gym.make("Hopper-v4", render_mode="rgb_array")

print(env.action_space)
print(env.observation_space)
Box(-1.0, 1.0, (3,), float32)
Box(-inf, inf, (11,), float64)

Previously, we've dealt with discrete action spaces (e.g. going right or left in Cartpole). But here, we have a continuous action space - the actions take the form of a vector of 3 values, each in the range [-1.0, 1.0].

Question - after reading the documentation page, can you see exactly what our 3 actions mean?

They represent the torque applied between the three different links of the hopper. There is:

  • The thigh rotor (i.e. connecting the upper and middle parts of the leg),
  • The leg rotor (i.e. connecting the middle and lower parts of the leg),
  • The foot rotor (i.e. connecting the lower part of the leg to the foot).

How do we deal with a continuous action space, when it comes to choosing actions? Rather than our actor network's output being a vector of logits which we turn into a probability distribution via Categorical(logits=logits), we instead have our actor output two vectors mu and log_sigma, which we turn into a normal distribution which is then sampled from.

The observations take the form of a vector of 11 values describing the position, velocity, and forces applied to the joints. So unlike for Atari, we can't directly visualize the environment using its observations, instead we'll visualize it using env.render() which returns an array representing the environment state (thanks to the fact that we initialized the env with render_mode="rgb_array").

nsteps = 150

frames = []
obs, info = env.reset()
for _ in tqdm(range(nsteps)):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    frames.append(env.render())  # frames can't come from obs, because unlike in Atari our observations aren't images

display_frames(np.stack(frames))

Implementational details of MuJoCo

Clipping, Scaling & Normalization (details #5-9)

Just like for Atari, there are a few messy implementational details which will be taken care of with gym wrappers. For example, if we generate our actions by sampling from a normal distribution, then there's some non-zero chance that our action will be outside of the allowed action space. We deal with this by clipping the actions to be within the allowed range (in this case between -1 and 1).

See the function prepare_mujoco_env within part3_ppo/utils (and read details 5-9 on the PPO page) for more information.

Actor and Critic networks (details #1-4)

Our actor and critic networks are quite similar to the ones we used for cartpole. They won't have shared architecture.

Question - can you see why it's less useful to have shared architecture in this case, relative to the case of Atari?

The point of the shared architecture in Atari was that it allowed our critic and actor to perform feature extraction, i.e. the early part of the network (which was fed the raw pixel input) generated a high-level representation of the state, which was then fed into the actor and critic heads. But for CartPole and for MuJoCo, we have a very small observation space (4 discrete values in the case of CartPole, 11 for the Hopper in MuJoCo), so there's no feature extraction necessary.

The only difference will be in the actor network. There will be an actor_mu and actor_log_sigma network. The actor_mu will have exactly the same architecture as the CartPole actor network, and it will output a vector used as the mean of our normal distribution. The actor_log_sigma network will just be a bias, since the standard deviation is state-independent (detail #2).

Because of this extra complexity, we'll create a class for our actor and critic networks.

Exercise - implement Actor and Critic

Difficulty: 🔴🔴⚪⚪⚪
Importance: 🔵🔵🔵⚪⚪
You should spend up to 10-15 minutes on this exercise.

As discussed, the architecture of actor_mu is identical to your cartpole actor network, and the critic is identical. The only difference is the addition of actor_log_sigma, which you should initialize as an nn.Parameter object of shape (1, num_actions).

Your Actor class's forward function should return a tuple of (mu, sigma, dist), where mu and sigma are the parameters of the normal distribution, and dist was created from these values using torch.distributions.Normal.

Why do we use log_sigma rather than just outputting sigma ?

We have our network output log_sigma rather than sigma because the standard deviation is always positive. If we learn the log standard deviation rather than the standard deviation, then we can treat it just like a regular learned weight.

Tip - when creating your distribution, you can use the broadcast_to tensor method, so that your standard deviation and mean are the same shape.

We've given you the function get_actor_and_critic_mujoco (which is called when you call get_actor_and_critic with mode="mujoco"). All you need to do is fill in the Actor and Critic classes.

class Critic(nn.Module):
    def __init__(self, num_obs):
        super().__init__()
        raise NotImplementedError()

    def forward(self, obs) -> Tensor:
        raise NotImplementedError()


class Actor(nn.Module):
    actor_mu: nn.Sequential
    actor_log_sigma: nn.Parameter

    def __init__(self, num_obs, num_actions):
        super().__init__()
        raise NotImplementedError()

    def forward(self, obs) -> tuple[Tensor, Tensor, t.distributions.Normal]:
        raise NotImplementedError()


def get_actor_and_critic_mujoco(num_obs: int, num_actions: int):
    """
    Returns (actor, critic) in the "classic-control" case, according to description above.
    """
    return Actor(num_obs, num_actions), Critic(num_obs)


tests.test_get_actor_and_critic(get_actor_and_critic, mode="mujoco")
Solution
class Critic(nn.Module):
    def __init__(self, num_obs):
        super().__init__()
        self.critic = nn.Sequential(
            layer_init(nn.Linear(num_obs, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 1), std=1.0),
        )

    def forward(self, obs) -> Tensor:
        value = self.critic(obs)
        return value


class Actor(nn.Module):
    actor_mu: nn.Sequential
    actor_log_sigma: nn.Parameter

    def __init__(self, num_obs, num_actions):
        super().__init__()
        self.actor_mu = nn.Sequential(
            layer_init(nn.Linear(num_obs, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, num_actions), std=0.01),
        )
        self.actor_log_sigma = nn.Parameter(t.zeros(1, num_actions))

    def forward(self, obs) -> tuple[Tensor, Tensor, t.distributions.Normal]:
        mu = self.actor_mu(obs)
        sigma = t.exp(self.actor_log_sigma).broadcast_to(mu.shape)
        dist = t.distributions.Normal(mu, sigma)
        return mu, sigma, dist

Exercise - additional rewrites

Difficulty: 🔴🔴🔴⚪⚪
Importance: 🔵🔵⚪⚪⚪
You should spend up to 10-25 minutes on this exercise.

There are a few more rewrites you'll need for continuous action spaces, which is why we recommend that you create a new solutions file for this part (like we've done with solutions.py and solutions_cts.py).

You'll need to make the following changes:

Logprobs and entropy

Rather than probs = Categorical(logits=logits) as your distribution (which you sample from & pass into your loss functions), you'll just use dist as your distribution. Methods like .logprobs(action) and .entropy() will work on dist just like they did on probs.

Note that these two methods will return objects of shape (batch_size, action_shape) (e.g. for Hopper the last dimension will be 3). We treat the action components as independent (detail #4), meaning we take a product of the probabilities, so we sum the logprobs / entropies. For example:

$$ \begin{aligned} \operatorname{prob}\left(a_t\right)&=\operatorname{prob}\left(a_t^1\right) \cdot \operatorname{prob}\left(a_t^2\right) \\ \log\left(a_t\right)&=\log\left(a_t^1\right) + \log\left(a_t^2\right) \end{aligned} $$

So you'll need to sum logprobs and entropy over the last dimension. The logprobs value that you add to the replay memory should be summed over (because you don't need the individual logprobs, you only need the logprob of the action as a whole).

Logging

You should log mu and sigma during the learning phase.

Below, we've given you a template for all the things you'll need to change (with new class & function names so they don't overwrite the previous versions).

class PPOAgentCts(PPOAgent):
    def play_step(self) -> list[dict]:
        """
        Changes required:
            - actor returns (mu, sigma, dist), with dist used to sample actions
            - logprobs need to be summed over action space
        """
        raise NotImplementedError()


def calc_clipped_surrogate_objective_cts(
    dist: t.distributions.Normal,
    mb_action: Int[Tensor, " minibatch_size *action_shape"],
    mb_advantages: Float[Tensor, " minibatch_size"],
    mb_logprobs: Float[Tensor, " minibatch_size"],
    clip_coef: float,
    eps: float = 1e-8,
) -> Float[Tensor, ""]:
    """
    Changes required:
        - logprobs need to be summed over action space
    """
    assert (mb_action.shape[0],) == mb_advantages.shape == mb_logprobs.shape

    raise NotImplementedError()


def calc_entropy_bonus_cts(dist: t.distributions.Normal, ent_coef: float):
    """
    Changes required:
        - entropy needs to be summed over action space before taking mean
    """
    raise NotImplementedError()


class PPOTrainerCts(PPOTrainer):
    def __init__(self, args: PPOArgs):
        super().__init__(args)
        self.agent = PPOAgentCts(self.envs, self.actor, self.critic, self.memory)

    def compute_ppo_objective(self, minibatch: ReplayMinibatch) -> Float[Tensor, ""]:
        """
        Changes required:
            - actor returns (mu, sigma, dist), with dist used for loss functions (rather than
                getting dist from logits)
            - objective function calculated using new `_cts` functions defined above
            - newlogprob (for logging) needs to be summed over action space
            - mu and sigma should be logged
        """
        raise NotImplementedError()
Solution
class PPOAgentCts(PPOAgent):
    def play_step(self) -> list[dict]:
        """
        Changes required:
            - actor returns (mu, sigma, dist), with dist used to sample actions
            - logprobs need to be summed over action space
        """
        obs = self.next_obs
        terminated = self.next_terminated

        with t.inference_mode():
            # DISCRETE VERSION: derive distribution from the logits
            # dist = Categorical(logits=self.actor(obs))
            # CONTINUOUS VERSION: return distribution object directly
            _, _, dist = self.actor.forward(obs)

        actions = dist.sample()

        next_obs, rewards, next_terminated, next_truncated, infos = self.envs.step(actions.cpu().numpy())

        # DISCRETE VERSION: no need to sum logprobs
        # logprobs = dist.log_prob(actions).cpu().numpy()
        # CONTINUOUS VERSION: logprobs need to be summed over action space
        logprobs = dist.log_prob(actions).sum(-1).cpu().numpy()

        with t.inference_mode():
            values = self.critic(obs).flatten().cpu().numpy()
        self.memory.add(
            obs.cpu().numpy(),
            actions.cpu().numpy(),
            logprobs,
            values,
            rewards,
            terminated.cpu().numpy(),
        )

        self.next_obs = t.from_numpy(next_obs).to(device, dtype=t.float)
        self.next_terminated = t.from_numpy(next_terminated).to(device, dtype=t.float)

        self.step += self.envs.num_envs
        return infos


def calc_clipped_surrogate_objective_cts(
    dist: t.distributions.Normal,
    mb_action: Int[Tensor, " minibatch_size *action_shape"],
    mb_advantages: Float[Tensor, " minibatch_size"],
    mb_logprobs: Float[Tensor, " minibatch_size"],
    clip_coef: float,
    eps: float = 1e-8,
) -> Float[Tensor, ""]:
    """
    Changes required:
        - logprobs need to be summed over action space
    """
    assert (mb_action.shape[0],) == mb_advantages.shape == mb_logprobs.shape

    # DISCRETE VERSION: no need to sum logprobs
    # logits_diff = probs.log_prob(mb_action) - mb_logprobs
    # CONTINUOUS VERSION: logprobs need to be summed over action space
    logits_diff = dist.log_prob(mb_action).sum(-1) - mb_logprobs

    r_theta = t.exp(logits_diff)

    mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + eps)

    non_clipped = r_theta * mb_advantages
    clipped = t.clip(r_theta, 1 - clip_coef, 1 + clip_coef) * mb_advantages

    return t.minimum(non_clipped, clipped).mean()


def calc_entropy_bonus_cts(dist: t.distributions.Normal, ent_coef: float):
    """
    Changes required:
        - entropy needs to be summed over action space before taking mean
    """
    # DISCRETE VERSION: no need to sum entropy
    # return ent_coef * dist.entropy().mean()
    # CONTINUOUS VERSION: entropy needs to be summed over action space
    return ent_coef * dist.entropy().sum(-1).mean()


class PPOTrainerCts(PPOTrainer):
    def __init__(self, args: PPOArgs):
        super().__init__(args)
        self.agent = PPOAgentCts(self.envs, self.actor, self.critic, self.memory)

    def compute_ppo_objective(self, minibatch: ReplayMinibatch) -> Float[Tensor, ""]:
        """
        Changes required:
            - actor returns (mu, sigma, dist), with dist used for loss functions (rather than
                getting dist from logits)
            - objective function calculated using new `_cts` functions defined above
            - newlogprob (for logging) needs to be summed over action space
            - mu and sigma should be logged
        """
        # DISCRETE VERSION: actor returns logits, and we derive dist from them
        # dist = Categorical(logits=self.actor(minibatch.obs))
        # CONTINUOUS VERSION: actor returns distribution object directly
        mu, sigma, dist = self.agent.actor(minibatch.obs)

        values = self.agent.critic(minibatch.obs).squeeze()

        clipped_surrogate_objective = calc_clipped_surrogate_objective_cts(
            dist, minibatch.actions, minibatch.advantages, minibatch.logprobs, self.args.clip_coef
        )
        value_loss = calc_value_function_loss(values, minibatch.returns, self.args.vf_coef)
        entropy_bonus = calc_entropy_bonus_cts(dist, self.args.ent_coef)
        total_objective_function = clipped_surrogate_objective - value_loss + entropy_bonus

        with t.inference_mode():
            # DISCRETE VERSION: no need to sum logprobs
            # newlogprob = dist.log_prob(minibatch.actions)
            # CONTINUOUS VERSION: logprobs need to be summed over action space
            newlogprob = dist.log_prob(minibatch.actions).sum(-1)
            logratio = newlogprob - minibatch.logprobs
            ratio = logratio.exp()
            approx_kl = (ratio - 1 - logratio).mean().item()
            clipfracs = [((ratio - 1.0).abs() > self.args.clip_coef).float().mean().item()]
        if self.args.use_wandb:
            wandb.log(
                dict(
                    total_steps=self.agent.step,
                    values=values.mean().item(),
                    lr=self.scheduler.optimizer.param_groups[0]["lr"],
                    value_loss=value_loss.item(),
                    clipped_surrogate_objective=clipped_surrogate_objective.item(),
                    entropy=entropy_bonus.item(),
                    approx_kl=approx_kl,
                    clipfrac=np.mean(clipfracs),
                    # CONTINOUS VERSION: log mu and sigma
                    mu=mu.mean().item(),
                    sigma=sigma.mean().item(),
                ),
                step=self.agent.step,
            )

        return total_objective_function

Training MuJoCo

Now, you should be ready to run your training loop! We recommend using the following parameters, to match the original implmentation which the 37 Implementational Details post is based on (but you can experiment with different values if you like).

args = PPOArgs(
    env_id="Hopper-v4",
    wandb_project_name="PPOMuJoCo",
    use_wandb=True,
    mode="mujoco",
    lr=3e-4,
    ent_coef=0.0,
    num_minibatches=32,
    num_steps_per_rollout=2048,
    num_envs=1,
    video_log_freq=75,
)
trainer = PPOTrainerCts(args)
trainer.train()

You should expect the reward to increase pretty fast initially and then plateau once the agent learns the solution "kick off for a very large initial jump, and don't think about landing". Eventually the agent gets past this plateau, and learns to land successfully without immediately falling over. Once it's at the point where it can string two jumps together, your reward should start increasing much faster.

Here is a video produced from a successful run, using the parameters above:

and here's the corresponding plot of episode lengths:

Although we've used Hopper-v4 in these examples, you might also want to try InvertedPendulum-v4 (docs here). It's a much easier environment to solve, and it's a good way to check that your implementation is working (after all if it worked for CartPole then it should work here - in fact your inverted pendulum agent should converge to a perfect solution almost instantly, no reward shaping required). You can check out the other MuJoCo environments here.