PPO Experiment with Atari Breakout

This experiment trains Proximal Policy Optimization (PPO) agent Atari Breakout game on OpenAI Gym. It runs the game environments on multiple processes to sample efficiently.

13from typing import Dict
14
15import numpy as np
16import torch
17from torch import nn
18from torch import optim
19from torch.distributions import Categorical
20
21from labml import monit, tracker, logger, experiment
22from labml_helpers.module import Module
23from labml_nn.rl.game import Worker
24from labml_nn.rl.ppo import ClippedPPOLoss, ClippedValueFunctionLoss
25from labml_nn.rl.ppo.gae import GAE

Select device

28if torch.cuda.is_available():
29    device = torch.device("cuda:0")
30else:
31    device = torch.device("cpu")

Model

34class Model(Module):
39    def __init__(self):
40        super().__init__()

The first convolution layer takes a 84x84 frame and produces a 20x20 frame

44        self.conv1 = nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8, stride=4)

The second convolution layer takes a 20x20 frame and produces a 9x9 frame

48        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)

The third convolution layer takes a 9x9 frame and produces a 7x7 frame

52        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)

A fully connected layer takes the flattened frame from third convolution layer, and outputs 512 features

57        self.lin = nn.Linear(in_features=7 * 7 * 64, out_features=512)

A fully connected layer to get logits for $\pi$

60        self.pi_logits = nn.Linear(in_features=512, out_features=4)

A fully connected layer to get value function

63        self.value = nn.Linear(in_features=512, out_features=1)
66        self.activation = nn.ReLU()
68    def __call__(self, obs: torch.Tensor):
69        h = self.activation(self.conv1(obs))
70        h = self.activation(self.conv2(h))
71        h = self.activation(self.conv3(h))
72        h = h.reshape((-1, 7 * 7 * 64))
73
74        h = self.activation(self.lin(h))
75
76        pi = Categorical(logits=self.pi_logits(h))
77        value = self.value(h).reshape(-1)
78
79        return pi, value

Scale observations from [0, 255] to [0, 1]

82def obs_to_torch(obs: np.ndarray) -> torch.Tensor:
84    return torch.tensor(obs, dtype=torch.float32, device=device) / 255.

Trainer

87class Trainer:
92    def __init__(self):

Configurations

number of updates

96        self.updates = 10000

number of epochs to train the model with sampled data

98        self.epochs = 4

number of worker processes

100        self.n_workers = 8

number of steps to run on each process for a single update

102        self.worker_steps = 128

number of mini batches

104        self.n_mini_batch = 4

total number of samples for a single update

106        self.batch_size = self.n_workers * self.worker_steps

size of a mini batch

108        self.mini_batch_size = self.batch_size // self.n_mini_batch
109        assert (self.batch_size % self.n_mini_batch == 0)

Initialize

create workers

114        self.workers = [Worker(47 + i) for i in range(self.n_workers)]

initialize tensors for observations

117        self.obs = np.zeros((self.n_workers, 4, 84, 84), dtype=np.uint8)
118        for worker in self.workers:
119            worker.child.send(("reset", None))
120        for i, worker in enumerate(self.workers):
121            self.obs[i] = worker.child.recv()

model

124        self.model = Model().to(device)

optimizer

127        self.optimizer = optim.Adam(self.model.parameters(), lr=2.5e-4)

GAE with $\gamma = 0.99$ and $\lambda = 0.95$

130        self.gae = GAE(self.n_workers, self.worker_steps, 0.99, 0.95)

PPO Loss

133        self.ppo_loss = ClippedPPOLoss()

Value Loss

136        self.value_loss = ClippedValueFunctionLoss()

Sample data with current policy

138    def sample(self) -> Dict[str, torch.Tensor]:
143        rewards = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
144        actions = np.zeros((self.n_workers, self.worker_steps), dtype=np.int32)
145        done = np.zeros((self.n_workers, self.worker_steps), dtype=np.bool)
146        obs = np.zeros((self.n_workers, self.worker_steps, 4, 84, 84), dtype=np.uint8)
147        log_pis = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
148        values = np.zeros((self.n_workers, self.worker_steps + 1), dtype=np.float32)
149
150        with torch.no_grad():

sample worker_steps from each worker

152            for t in range(self.worker_steps):

self.obs keeps track of the last observation from each worker, which is the input for the model to sample the next action

155                obs[:, t] = self.obs

sample actions from $\pi_{\theta_{OLD}}$ for each worker; this returns arrays of size n_workers

158                pi, v = self.model(obs_to_torch(self.obs))
159                values[:, t] = v.cpu().numpy()
160                a = pi.sample()
161                actions[:, t] = a.cpu().numpy()
162                log_pis[:, t] = pi.log_prob(a).cpu().numpy()

run sampled actions on each worker

165                for w, worker in enumerate(self.workers):
166                    worker.child.send(("step", actions[w, t]))
167
168                for w, worker in enumerate(self.workers):

get results after executing the actions

170                    self.obs[w], rewards[w, t], done[w, t], info = worker.child.recv()

collect episode info, which is available if an episode finished; this includes total reward and length of the episode - look at Game to see how it works.

175                    if info:
176                        tracker.add('reward', info['reward'])
177                        tracker.add('length', info['length'])

Get value of after the final step

180            _, v = self.model(obs_to_torch(self.obs))
181            values[:, self.worker_steps] = v.cpu().numpy()

calculate advantages

184        advantages = self.gae(done, rewards, values)
187        samples = {
188            'obs': obs,
189            'actions': actions,
190            'values': values[:, :-1],
191            'log_pis': log_pis,
192            'advantages': advantages
193        }

samples are currently in [workers, time_step] table, we should flatten it for training

197        samples_flat = {}
198        for k, v in samples.items():
199            v = v.reshape(v.shape[0] * v.shape[1], *v.shape[2:])
200            if k == 'obs':
201                samples_flat[k] = obs_to_torch(v)
202            else:
203                samples_flat[k] = torch.tensor(v, device=device)
204
205        return samples_flat

Train the model based on samples

207    def train(self, samples: Dict[str, torch.Tensor], learning_rate: float, clip_range: float):

It learns faster with a higher number of epochs, but becomes a little unstable; that is, the average episode reward does not monotonically increase over time. May be reducing the clipping range might solve it.

217        for _ in range(self.epochs):

shuffle for each epoch

219            indexes = torch.randperm(self.batch_size)

for each mini batch

222            for start in range(0, self.batch_size, self.mini_batch_size):

get mini batch

224                end = start + self.mini_batch_size
225                mini_batch_indexes = indexes[start: end]
226                mini_batch = {}
227                for k, v in samples.items():
228                    mini_batch[k] = v[mini_batch_indexes]

train

231                loss = self._calc_loss(clip_range=clip_range,
232                                       samples=mini_batch)

Set learning rate

235                for pg in self.optimizer.param_groups:
236                    pg['lr'] = learning_rate

Zero out the previously calculated gradients

238                self.optimizer.zero_grad()

Calculate gradients

240                loss.backward()

Clip gradients

242                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)

Update parameters based on gradients

244                self.optimizer.step()

Normalize advantage function

246    @staticmethod
247    def _normalize(adv: torch.Tensor):
249        return (adv - adv.mean()) / (adv.std() + 1e-8)

Calculate total loss

251    def _calc_loss(self, samples: Dict[str, torch.Tensor], clip_range: float) -> torch.Tensor:

$R_t$ returns sampled from $\pi_{\theta_{OLD}}$

257        sampled_return = samples['values'] + samples['advantages']

$\bar{A_t} = \frac{\hat{A_t} - \mu(\hat{A_t})}{\sigma(\hat{A_t})}$, where $\hat{A_t}$ is advantages sampled from $\pi_{\theta_{OLD}}$. Refer to sampling function in Main class below for the calculation of $\hat{A}_t$.

263        sampled_normalized_advantage = self._normalize(samples['advantages'])

Sampled observations are fed into the model to get $\pi_\theta(a_t|s_t)$ and $V^{\pi_\theta}(s_t)$; we are treating observations as state

267        pi, value = self.model(samples['obs'])

$-\log \pi_\theta (a_t|s_t)$, $a_t$ are actions sampled from $\pi_{\theta_{OLD}}$

270        log_pi = pi.log_prob(samples['actions'])

Calculate policy loss

273        policy_loss = self.ppo_loss(log_pi, samples['log_pis'], sampled_normalized_advantage, clip_range)

Calculate Entropy Bonus

$\mathcal{L}^{EB}(\theta) = \mathbb{E}\Bigl[ S\bigl[\pi_\theta\bigr] (s_t) \Bigr]$

279        entropy_bonus = pi.entropy()
280        entropy_bonus = entropy_bonus.mean()

Calculate value function loss

283        value_loss = self.value_loss(value, samples['values'], sampled_return, clip_range)

$\mathcal{L}^{CLIP+VF+EB} (\theta) = \mathcal{L}^{CLIP} (\theta) + c_1 \mathcal{L}^{VF} (\theta) - c_2 \mathcal{L}^{EB}(\theta)$

288        loss = policy_loss + 0.5 * value_loss - 0.01 * entropy_bonus

for monitoring

291        approx_kl_divergence = .5 * ((samples['log_pis'] - log_pi) ** 2).mean()

Add to tracker

294        tracker.add({'policy_reward': -policy_loss,
295                     'value_loss': value_loss,
296                     'entropy_bonus': entropy_bonus,
297                     'kl_div': approx_kl_divergence,
298                     'clip_fraction': self.ppo_loss.clip_fraction})
299
300        return loss

Run training loop

302    def run_training_loop(self):

last 100 episode information

308        tracker.set_queue('reward', 100, True)
309        tracker.set_queue('length', 100, True)
310
311        for update in monit.loop(self.updates):
312            progress = update / self.updates

decreasing learning_rate and clip_range $\epsilon$

315            learning_rate = 2.5e-4 * (1 - progress)
316            clip_range = 0.1 * (1 - progress)

sample with current policy

319            samples = self.sample()

train the model

322            self.train(samples, learning_rate, clip_range)

Save tracked indicators.

325            tracker.save()

Add a new line to the screen periodically

327            if (update + 1) % 1_000 == 0:
328                logger.log()

Destroy

Stop the workers

330    def destroy(self):
335        for worker in self.workers:
336            worker.child.send(("close", None))
339def main():

Create the experiment

341    experiment.create(name='ppo')

Initialize the trainer

343    m = Trainer()

Run and monitor the experiment

345    with experiment.start():
346        m.run_training_loop()

Stop the workers

348    m.destroy()

Run it

352if __name__ == "__main__":
353    main()