Exercise Status: All exercises complete and verified

3️⃣ Multi-Armed Bandit

Learning Objectives
  • Compare Upper Confidence Bound (UCB) methods to other exploration methods.

This material used to be core, but we've moved it to the bonus section, as nothing later on depends on it. Feel free to skip it, and look through the optional readings above if you prefer.

Readings

  • Sutton and Barto, Chapter 2 (25 minutes)
  • Section 2.1: A k-armed Bandit Problem through to Section 2.7 Upper-Confidence-Bound Action Section
  • We won't cover Gradient Bandits. Don't worry about these.
  • Don't worry too much about all the math for the moment if you can't follow it. The earlier sections are more important.

The info dictionary

The environment's step method returns four values: obs, reward, done, and the info dictionary.

info can contain anything extra that doesn't fit into the uniform interface - it's up to the environment what to put into it. A good use of this is for debugging information that the agent isn't "supposed" to see. In this case, we'll return the index of the actual best arm. This would allow us to measure how often the agent chooses the best arm, but it would also allow us to build a "cheating" agent that peeks at this information to make its decision.

Cheating agents are helpful because we know that they should obtain the maximum possible rewards; if they aren't, then there's a bug.

The render() method

Render is only used for debugging or entertainment, and what it does is up to the environment. It might render a little popup window showing the Atari game, or it might give you a RGB image tensor, or just some ASCII text describing what's happening. In this case, we'll just make a little plot showing the distribution of rewards for each arm of the bandit.

Observation and Action Types

A gym.Env is a generic type: both the type of the observations and the type of the actions depends on the specifics of the environment.

We're only dealing with the simplest case: a discrete set of actions which are the same in every state (and ditto for observations). In general, the actions could be continuous, or depend on the state.

ObsType = int
ActType = int


class MultiArmedBandit(gym.Env):
    """
    A class representing a multi-armed bandit environment, based on OpenAI Gym's Env class.

    Attributes:
        action_space (gym.spaces.Discrete): The space of possible actions, representing the arms of
            the bandit.
        observation_space (gym.spaces.Discrete): The space of possible observations.
        num_arms (int): The number of arms in the bandit.
        stationary (bool): Indicates whether the reward distribution (i.e. the arm_reward_means) is
            stationary or not.
        arm_reward_means (np.ndarray): The mean rewards for each arm.
    """

    action_space: gym.spaces.Discrete
    observation_space: gym.spaces.Discrete
    num_arms: int
    stationary: bool
    arm_reward_means: np.ndarray

    def __init__(self, num_arms=10, stationary=True):
        """
        Initializes the MultiArmedBandit environment.

        Args:
            num_arms (int): The number of arms for the bandit. Defaults to 10.
            stationary (bool): Whether the bandit has a stationary reward distribution.
        """
        super().__init__()
        self.num_arms = num_arms
        self.stationary = stationary
        self.observation_space = gym.spaces.Discrete(1)
        self.action_space = gym.spaces.Discrete(num_arms)
        self.reset()

    def step(self, arm: ActType) -> tuple[ObsType, float, bool, dict]:
        """
        Takes an action by choosing an arm and returns the result of the action.

        Args:
            arm (ActType): The selected arm to pull in the bandit.

        Returns:
            obs (ObsType): The observation.
            reward (float): The reward.
            terminated (bool): Whether episode has terminated, i.e. for non-timeout related reasons.
            truncated (bool): Whether episode has timed out.
            info (dict): Additional information.
        """
        assert self.action_space.contains(arm)
        if not self.stationary:
            q_drift = self.np_random.normal(loc=0.0, scale=0.01, size=self.num_arms)
            self.arm_reward_means += q_drift
            self.best_arm = int(np.argmax(self.arm_reward_means))
        reward = self.np_random.normal(loc=self.arm_reward_means[arm], scale=1.0)
        obs = 0
        terminated = False
        truncated = False
        info = dict(best_arm=self.best_arm)
        return (obs, reward, terminated, truncated, info)

    def reset(self, seed: int | None = None, options=None) -> tuple[ObsType, dict]:
        """
        Resets the environment to its initial state.

        Args:
            seed (int | None): The seed for random number generation. Defaults to None.
            options (dict): Additional options for environment reset. Defaults to None.

        Returns:
            obs (ObsType): The initial observation.
            info (dict): Additional information.
        """
        super().reset(seed=seed)
        if self.stationary:
            self.arm_reward_means = self.np_random.normal(loc=0.0, scale=1.0, size=self.num_arms)
        else:
            self.arm_reward_means = np.zeros(shape=[self.num_arms])
        self.best_arm = int(np.argmax(self.arm_reward_means))

        obs = 0
        info = {}
        return obs, info

    def render(self, mode="human"):
        """
        Renders the state of the environment, in the form of a violin plot.
        """
        assert mode == "human", f"Mode {mode} not supported!"
        bandit_samples = []
        for arm in range(self.action_space.n):
            bandit_samples += [np.random.normal(loc=self.arm_reward_means[arm], scale=1.0, size=1000)]
        plt.violinplot(bandit_samples, showmeans=True)
        plt.xlabel("Bandit Arm")
        plt.ylabel("Reward Distribution")
        plt.show()

Registering an Environment

Usually we won't use a class like MultiArmedBandit directly, because we'll want to wrap it in classes that add various other features that help it work with the rest of the gymnasium library. The register function does this for us: it stores information about our Env in a registry so that a later call to gym.make can look it up using the id string that is passed in, and then create an instance of our class with a bunch of extra wrapper classes.

By convention, the id strings have a suffix with a version number. There can be multiple versions of the "same" environment with different parameters, and benchmarks should always report the version number for a fair comparison. For instance, id="ArmedBanditTestbed-v0" below.

TimeLimit wrapper

One important wrapper class is the TimeLimit wrapper - this is used to ensure that the environment terminates after a fixed number of steps. It works by having the truncated flag returned by the step method be overridden after max_episode_steps, and set to True. You can see from looking at the step method above that without this, the agent might keep playing forever, because it always returns terminated=truncated=False. The time limit is also an essential part of the problem definition: if it were larger or shorter, there would be more or less time to explore, which means that different algorithms (or at least different hyperparameters) would then have improved performance.

A list of the other wrappers you'll see printed below (these are less important):

  • OrderEnforcing ensures the environment is reset before we start using it
  • PassiveEnvChecker surrounds the step, reset and render functions to check they follow Gymnasium’s API
gym.envs.registration.register(
    id="ArmedBanditTestbed-v0",
    entry_point=MultiArmedBandit,
    max_episode_steps=max_episode_steps,
    nondeterministic=True,
    reward_threshold=1.0,
    kwargs={"num_arms": 10, "stationary": True},
)

env = gym.make("ArmedBanditTestbed-v0")
print(f"Our env inside its wrappers looks like: {env}")
Our env inside its wrappers looks like: <TimeLimit<OrderEnforcing<PassiveEnvChecker<MultiArmedBandit<ArmedBanditTestbed-v0>>>>>

A Note on (pseudo) RNGs

The PRNG that gym.Env provides as self.np_random is from the PCG family. In RL code, you often need massive quantities of pseudorandomly generated numbers, so it's important to have a generator that is both very fast and has good quality output.

When you call np.random.randint or similar, you're using the old-school Mersenne Twister algorithm which is both slower and has inferior quality output to PCG. Since Numpy 1.17, you can use np.random.default_rng() to get a PCG generator and then use its integers method to get random integers.

Agent class

Now, you'll start defining agents to act within this class.

You'll be defining your agents as subclasses of the base class Agent for agents in a multi-armed bandit environment, given below. It contains the following methods:

  • get_action, which returns the agent's next action. This doesn't take state as an argument, because the state of the multi armed bandit game is always the same.
    • This isn't implemented for Agent, because we'll implement it separately for each agent we build on top of the base agent.
  • observe, which determines what the model does when it observes a particular action & reward combination.
    • The default behaviour is to do nothing, although for some agents we'll implement more complicated behaviour.
  • reset, which is run before each episode starts.
    • All that the base method does is reset the random number generator, but again future agents might need to do more, e.g. reset data gathered from the observe method.

Additionally, the run_episode function will run a single episode of interaction between an agent and the environment. As discussed above, this will terminate after max_episode_steps for our environment (we've set this to be 1000). The run_agent function calls run_episode multiple times in series.

class Agent:
    """
    Base class for agents in a multi-armed bandit environment

    (you do not need to add any implementation here)
    """

    rng: np.random.Generator

    def __init__(self, num_arms: int, seed: int):
        self.num_arms = num_arms
        self.reset(seed)

    def get_action(self) -> ActType:
        raise NotImplementedError()

    def observe(self, action: ActType, reward: float, info: dict) -> None:
        pass

    def reset(self, seed: int) -> None:
        self.rng = np.random.default_rng(seed)


def run_episode(env: gym.Env, agent: Agent, seed: int) -> tuple[Arr, Arr]:
    """
    Runs a single episode of interaction between an agent and an environment.

    Args:
        env (gym.Env): The environment in which the agent operates.
        agent (Agent): The agent that takes actions in the environment.
        seed (int): The seed for random number generation to ensure reproducibility.

    Returns:
        A tuple containing arrays of rewards received in each step and a flag indicating if the
        chosen arm was best.
    """
    (rewards, was_best) = ([], [])

    env.reset(seed=seed)
    agent.reset(seed=seed)

    done = False
    while not done:
        arm = agent.get_action()
        obs, reward, terminated, truncated, info = env.step(arm)
        done = terminated or truncated
        agent.observe(arm, reward, info)
        rewards.append(reward)
        was_best.append(1 if arm == info["best_arm"] else 0)

    rewards = np.array(rewards, dtype=float)
    was_best = np.array(was_best, dtype=int)
    return (rewards, was_best)


def run_agent(env: gym.Env, agent: Agent, n_runs=200, base_seed=1) -> tuple[Arr, Arr]:
    all_rewards = []
    all_was_bests = []
    base_rng = np.random.default_rng(base_seed)
    for n in tqdm(range(n_runs)):
        seed = base_rng.integers(low=0, high=10_000, size=1).item()
        (rewards, corrects) = run_episode(env, agent, seed)
        all_rewards.append(rewards)
        all_was_bests.append(corrects)
    return np.array(all_rewards), np.array(all_was_bests)

Exercise - implement RandomAgent

Difficulty: 🔴🔴⚪⚪⚪
Importance: 🔵🔵⚪⚪⚪
You should spend up to 10-20 minutes on this exercise.

You should fill in the get_action method from RandomAgent, which is a subclass of Agent. This agent should pick an arm at random, i.e. from the range [0, ..., num_arms-1], and return it as an integer. You won't need to change the observe method since the random agent doesn't change its behaviour based on observations, and you won't need to change the reset method since there are no observations which need to be reset.

This agent will be useful as a baseline to ensure the environment has no bugs. If your later agents are doing worse than random, you have a bug! Later, we'll add some smarter agents.

When you've finished implementing this agent, running the cell below will:

  • Verify that RandomAgent pulls the optimal arm with frequency roughly 1/num_arms.
  • Verify that the average reward is very roughly zero. This is the case since the mean reward for each arm is centered on zero.
class RandomAgent(Agent):
    def get_action(self) -> ActType:
        raise NotImplementedError("Implement the get_action method for the RandomAgent class.")

    def __repr__(self):
        return "RandomAgent"


num_arms = 10
stationary = True
env = gym.make("ArmedBanditTestbed-v0", num_arms=num_arms, stationary=stationary)
agent = RandomAgent(num_arms, 0)
all_rewards, all_corrects = run_agent(env, agent)

print(f"Expected correct freq: {1 / 10}, actual: {all_corrects.mean():.6f}")
assert np.isclose(all_corrects.mean(), 1 / 10, atol=0.05), "Random agent is not random enough!"

print(f"Expected average reward: 0.0, actual: {all_rewards.mean():.6f}")
assert np.isclose(all_rewards.mean(), 0, atol=0.05), (
    "Random agent should be getting mean arm reward, which is zero."
)

print("All tests passed!")
Solution
class RandomAgent(Agent):
    def get_action(self) -> ActType:
        return self.rng.integers(low=0, high=self.num_arms)

    def __repr__(self):
        return "RandomAgent"

Exercise - Implement reward averaging

Difficulty: 🔴🔴🔴⚪⚪
Importance: 🔵🔵⚪⚪⚪
You should spend up to 20-30 minutes on this exercise.

You should now complete the methods for the RewardAveraging agent, which applies the reward averaging algorithm as detailed in Sutton and Barto section 2.4, "Incremental Implementation":

  • You should fill in observe to keep track of the number of times $n$ each arm has been pushed, as well as the value $Q_n$ for each arm.
    • Gotcha - in S & B notation, $n$ is the number of times this particular arm has been pulled, not the total number of actions taken!
    • We recommend defining arrays N and Q, each of length num_arms, to keep track of all these values.
  • You should fill in get_action with an epsilon-greedy method: taking a random action with probability epsilon, and taking the best action based on the current value of $Q$ with probability 1-epsilon (see Sutton & Barto).
  • You should fill in reset to call the reset method from the parent class, and make sure that the tracked values of $(n, Q_n)$ are set back to appropriate values at the start of each episode.
    • Note, the reset method is also called before the very first run, so you don't need to define N and Q in the init method.
    • The Q values should be initialized according to the optimism value of this agent.
    • Ensure the Q values are stored as float (and not numpy's default, integer)
Hint - average reward formula

$$Q_k = Q_{k-1} + \frac{1}{k}[R_k - Q_{k-1}]$$

Where $k$ is the number of times the action has been taken, $R_k$ is the reward from the kth time the action was taken, and $Q_{k-1}$ is the average reward from the previous times this action was taken (this notation departs slightly from the S&B notation, but may be more helpful for our implementation).

Important - $k$ is not the total number of timesteps, it's the total number of times you've taken this particular action.

We've given you a function for plotting multiple agents' reward trajectories on the same graph, with an optional moving average parameter to make the graph smoother.

class RewardAveraging(Agent):
    def __init__(self, num_arms: int, seed: int, epsilon: float, optimism: float):
        self.epsilon = epsilon
        self.optimism = optimism
        super().__init__(num_arms, seed)

    def get_action(self):
        raise NotImplementedError("Implement the get_action method for the RewardAveraging class.")

    def observe(self, action, reward, info):
        raise NotImplementedError("Implement the observe method for the RewardAveraging class.")

    def reset(self, seed: int):
        raise NotImplementedError("Implement the reset method for the RewardAveraging class.")

    def __repr__(self):
        # For the legend, when plotting
        return f"RewardAveraging(eps={self.epsilon}, optimism={self.optimism})"


num_arms = 10
stationary = True
names = []
all_rewards = []
env = gym.make("ArmedBanditTestbed-v0", num_arms=num_arms, stationary=stationary)

for optimism in [0, 5]:
    agent = RewardAveraging(num_arms, 0, epsilon=0.01, optimism=optimism)
    (rewards, num_correct) = run_agent(env, agent, n_runs=N_RUNS, base_seed=1)
    all_rewards.append(rewards)
    names.append(str(agent))
    print(agent)
    print(f" -> Frequency of correct arm: {num_correct.mean():.4f}")
    print(f" -> Average reward: {rewards.mean():.4f}")

utils.plot_rewards(all_rewards, names, moving_avg_window=15)
Click to see the expected output
Question - can you interpret these results?

At the very start, the more optimistic agent performs worse, because it explores more and exploits less (here you will see it at the very beginning if you zoom in). Its estimates are wildly over-optimistic, so even if it finds a good arm, its Q-value for that arm will decrease. On the other hand, if the realistic agent finds a good arm early on, it'll probably return to exploit it.

However, the optimistic agent eventually outperforms the realistic agent, because its increased exploration means it's more likely to converge on the best arm.

Question - how do you think these results would change if epsilon was decreased for both agents?

You should expect the optimistic agent to outperform the realistic agent even more. The smaller epsilon is, the more necessary optimism is (because without it the agent won't explore enough).

Solution
class RewardAveraging(Agent):
    def __init__(self, num_arms: int, seed: int, epsilon: float, optimism: float):
        self.epsilon = epsilon
        self.optimism = optimism
        super().__init__(num_arms, seed)

    def get_action(self):
        if self.rng.random() < self.epsilon:
            return self.rng.integers(low=0, high=self.num_arms).item()
        else:
            return np.argmax(self.Q)

    def observe(self, action, reward, info):
        self.N[action] += 1
        self.Q[action] += (reward - self.Q[action]) / self.N[action]

    def reset(self, seed: int):
        super().reset(seed)
        self.N = np.zeros(self.num_arms)
        self.Q = np.full(self.num_arms, self.optimism, dtype=float)

    def __repr__(self):
        # For the legend, when plotting
        return f"RewardAveraging(eps={self.epsilon}, optimism={self.optimism})"

Exercise - implement cheater agent

Difficulty: 🔴🔴⚪⚪⚪
Importance: 🔵🔵🔵⚪⚪
You should spend up to 5-15 minutes on this exercise. It's important for you to understand why cheater agents are important for debugging.

The cheater agent will always choose the best arm. It's a good idea to implement, because you can compare your other agents to it to make sure they're not doing better than the cheater (if they are, you probably have a bug!).

You should fill in the methods get_action and observe below. The cheater agent will always choose the best arm available to it (remember that the best arm is stored in the info dictionary of each observation - see the step method in the MultiArmedBandit class).

class CheatyMcCheater(Agent):
    def __init__(self, num_arms: int, seed: int):
        super().__init__(num_arms, seed)
        self.best_arm = 0

    def get_action(self):
        raise NotImplementedError("Implement the get_action method for the CheatyMcCheater class.")

    def observe(self, action: int, reward: float, info: dict):
        raise NotImplementedError("Implement the observe method for the CheatyMcCheater class.")

    def __repr__(self):
        return "Cheater"


cheater = CheatyMcCheater(num_arms, 0)
reward_averaging = RewardAveraging(num_arms, 0, epsilon=0.1, optimism=0)
random = RandomAgent(num_arms, 0)

names = []
all_rewards = []

for agent in [cheater, reward_averaging, random]:
    (rewards, num_correct) = run_agent(env, agent, n_runs=N_RUNS, base_seed=1)
    names.append(str(agent))
    all_rewards.append(rewards)

utils.plot_rewards(all_rewards, names, moving_avg_window=15)

assert (all_rewards[0] < all_rewards[1]).mean() < 0.001, "Cheater should be better than reward averaging"
print("Tests passed!")
Click to see the expected output
Solution
class CheatyMcCheater(Agent):
    def __init__(self, num_arms: int, seed: int):
        super().__init__(num_arms, seed)
        self.best_arm = 0

    def get_action(self):
        return self.best_arm

    def observe(self, action: int, reward: float, info: dict):
        self.best_arm = info["best_arm"]

    def __repr__(self):
        return "Cheater"

The Authentic RL Experience

It would be nice if we could say something like "optimistic reward averaging is a good/bad feature that improves/decreases performance in bandit problems." Unfortunately, we can't justifiably claim either at this point.

Usually, RL code fails silently, which makes it difficult to be confident that you don't have any bugs. I had a bug in my first attempt that made both versions appear to perform equally, and there were only 13 lines of code, and I had written similar code before.

The number of hyperparameters also grows rapidly, and hyperparameters have interactions with each other. Even in this simple problem, we already have two different ways to encourage exploration (epsilon and optimism), and it's not clear whether it's better to use one or the other, or both in some combination. It's actually worse than that, because epsilon should probably be annealed down at some rate.

Even in this single comparison, we trained 200 agents for each version. Is that a big number or a small number to estimate the effect size? Probably we should like, compute some statistics? And test with a different number of arms - maybe we need more exploration for more arms? The time needed for a rigorous evaluation is going to increase quickly.

As we continue onward to more complicated algorithms, keep an eye out for small discrepancies or minor confusions. Look for opportunities to check and cross-check everything, and be humble with claims.

Exercise - implement UCBA selection

Difficulty: 🔴🔴🔴⚪⚪
Importance: 🔵🔵⚪⚪⚪
You should spend up to 15-30 minutes on this exercise.

Once you feel good about your RewardAveraging implementation, you should implement UCBActionSelection.

This should store the same moving average rewards for each action as RewardAveraging did, but instead of taking actions using the epsilon-greedy strategy it should use Equation 2.10 in Section 2.7 to select actions using the upper confidence bound.

You should expect to see a small improvement over RewardAveraging using this strategy.

Tip - be careful of division-by-zero errors!

class UCBActionSelection(Agent):
    def __init__(self, num_arms: int, seed: int, c: float, eps: float = 1e-6):
        super().__init__(num_arms, seed)
        self.c = c
        self.eps = eps

    def get_action(self):
        raise NotImplementedError()

    def observe(self, action, reward, info):
        raise NotImplementedError()

    def reset(self, seed: int):
        super().reset(seed)
        raise NotImplementedError()

    def __repr__(self):
        return f"UCB(c={self.c})"


cheater = CheatyMcCheater(num_arms, 0)
reward_averaging = RewardAveraging(num_arms, 0, epsilon=0.1, optimism=0)
reward_averaging_optimism = RewardAveraging(num_arms, 0, epsilon=0.1, optimism=5)
ucb = UCBActionSelection(num_arms, 0, c=2.0)
random = RandomAgent(num_arms, 0)

names = []
all_rewards = []

for agent in [cheater, reward_averaging, reward_averaging_optimism, ucb, random]:
    (rewards, num_correct) = run_agent(env, agent, n_runs=N_RUNS, base_seed=1)
    names.append(str(agent))
    all_rewards.append(rewards)

utils.plot_rewards(all_rewards, names, moving_avg_window=15)
# utils.plot_rewards(all_rewards, names, moving_avg_window=15, filename=str(section_dir / "2103.html"))
Click to see the expected output
Solution
class UCBActionSelection(Agent):
    def __init__(self, num_arms: int, seed: int, c: float, eps: float = 1e-6):
        super().__init__(num_arms, seed)
        self.c = c
        self.eps = eps

    def get_action(self):
        # This method avoids division by zero errors, and makes sure N=0 entries are chosen by argmax
        ucb = self.Q + self.c * np.sqrt(np.log(self.t) / (self.N + self.eps))
        return np.argmax(ucb)

    def observe(self, action, reward, info):
        self.t += 1
        self.N[action] += 1
        self.Q[action] += (reward - self.Q[action]) / self.N[action]

    def reset(self, seed: int):
        super().reset(seed)
        self.t = 1
        self.N = np.zeros(self.num_arms)
        self.Q = np.zeros(self.num_arms)

    def __repr__(self):
        return f"UCB(c={self.c})"

Bonus

Here are a few bonus exercises you can try if you're interested. You could instead progress to the second section, and return to them if you have time.

  • Implement the gradient bandit algorithm.
  • Implement an environment and an agent for the contextual bandit problem.
  • Complete the exercises at the end of Chapter 2 of Sutton and Barto.