This experiment trains Proximal Policy Optimization (PPO) agent Atari Breakout game on OpenAI Gym. It runs the game environments on multiple processes to sample efficiently.
13from typing import Dict
14
15import numpy as np
16import torch
17from torch import nn
18from torch import optim
19from torch.distributions import Categorical
20
21from labml import monit, tracker, logger, experiment
22from labml_helpers.module import Module
23from labml_nn.rl.game import Worker
24from labml_nn.rl.ppo import ClippedPPOLoss, ClippedValueFunctionLoss
25from labml_nn.rl.ppo.gae import GAESelect device
28if torch.cuda.is_available():
29    device = torch.device("cuda:0")
30else:
31    device = torch.device("cpu")34class Model(Module):39    def __init__(self):
40        super().__init__()The first convolution layer takes a 84x84 frame and produces a 20x20 frame
44        self.conv1 = nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8, stride=4)The second convolution layer takes a 20x20 frame and produces a 9x9 frame
48        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)The third convolution layer takes a 9x9 frame and produces a 7x7 frame
52        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)A fully connected layer takes the flattened frame from third convolution layer, and outputs 512 features
57        self.lin = nn.Linear(in_features=7 * 7 * 64, out_features=512)A fully connected layer to get logits for $\pi$
60        self.pi_logits = nn.Linear(in_features=512, out_features=4)A fully connected layer to get value function
63        self.value = nn.Linear(in_features=512, out_features=1)66        self.activation = nn.ReLU()68    def __call__(self, obs: torch.Tensor):
69        h = self.activation(self.conv1(obs))
70        h = self.activation(self.conv2(h))
71        h = self.activation(self.conv3(h))
72        h = h.reshape((-1, 7 * 7 * 64))
73
74        h = self.activation(self.lin(h))
75
76        pi = Categorical(logits=self.pi_logits(h))
77        value = self.value(h).reshape(-1)
78
79        return pi, valueScale observations from [0, 255] to [0, 1]
82def obs_to_torch(obs: np.ndarray) -> torch.Tensor:84    return torch.tensor(obs, dtype=torch.float32, device=device) / 255.87class Trainer:92    def __init__(self):number of updates
96        self.updates = 10000number of epochs to train the model with sampled data
98        self.epochs = 4number of worker processes
100        self.n_workers = 8number of steps to run on each process for a single update
102        self.worker_steps = 128number of mini batches
104        self.n_mini_batch = 4total number of samples for a single update
106        self.batch_size = self.n_workers * self.worker_stepssize of a mini batch
108        self.mini_batch_size = self.batch_size // self.n_mini_batch
109        assert (self.batch_size % self.n_mini_batch == 0)create workers
114        self.workers = [Worker(47 + i) for i in range(self.n_workers)]initialize tensors for observations
117        self.obs = np.zeros((self.n_workers, 4, 84, 84), dtype=np.uint8)
118        for worker in self.workers:
119            worker.child.send(("reset", None))
120        for i, worker in enumerate(self.workers):
121            self.obs[i] = worker.child.recv()model
124        self.model = Model().to(device)optimizer
127        self.optimizer = optim.Adam(self.model.parameters(), lr=2.5e-4)GAE with $\gamma = 0.99$ and $\lambda = 0.95$
130        self.gae = GAE(self.n_workers, self.worker_steps, 0.99, 0.95)PPO Loss
133        self.ppo_loss = ClippedPPOLoss()Value Loss
136        self.value_loss = ClippedValueFunctionLoss()138    def sample(self) -> Dict[str, torch.Tensor]:143        rewards = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
144        actions = np.zeros((self.n_workers, self.worker_steps), dtype=np.int32)
145        done = np.zeros((self.n_workers, self.worker_steps), dtype=np.bool)
146        obs = np.zeros((self.n_workers, self.worker_steps, 4, 84, 84), dtype=np.uint8)
147        log_pis = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
148        values = np.zeros((self.n_workers, self.worker_steps + 1), dtype=np.float32)
149
150        with torch.no_grad():sample worker_steps from each worker
152            for t in range(self.worker_steps):self.obs keeps track of the last observation from each worker,
 which is the input for the model to sample the next action
155                obs[:, t] = self.obssample actions from $\pi_{\theta_{OLD}}$ for each worker;
 this returns arrays of size n_workers
158                pi, v = self.model(obs_to_torch(self.obs))
159                values[:, t] = v.cpu().numpy()
160                a = pi.sample()
161                actions[:, t] = a.cpu().numpy()
162                log_pis[:, t] = pi.log_prob(a).cpu().numpy()run sampled actions on each worker
165                for w, worker in enumerate(self.workers):
166                    worker.child.send(("step", actions[w, t]))
167
168                for w, worker in enumerate(self.workers):get results after executing the actions
170                    self.obs[w], rewards[w, t], done[w, t], info = worker.child.recv()collect episode info, which is available if an episode finished;
 this includes total reward and length of the episode -
 look at Game to see how it works.
175                    if info:
176                        tracker.add('reward', info['reward'])
177                        tracker.add('length', info['length'])Get value of after the final step
180            _, v = self.model(obs_to_torch(self.obs))
181            values[:, self.worker_steps] = v.cpu().numpy()calculate advantages
184        advantages = self.gae(done, rewards, values)187        samples = {
188            'obs': obs,
189            'actions': actions,
190            'values': values[:, :-1],
191            'log_pis': log_pis,
192            'advantages': advantages
193        }samples are currently in [workers, time_step] table,
we should flatten it for training
197        samples_flat = {}
198        for k, v in samples.items():
199            v = v.reshape(v.shape[0] * v.shape[1], *v.shape[2:])
200            if k == 'obs':
201                samples_flat[k] = obs_to_torch(v)
202            else:
203                samples_flat[k] = torch.tensor(v, device=device)
204
205        return samples_flat207    def train(self, samples: Dict[str, torch.Tensor], learning_rate: float, clip_range: float):It learns faster with a higher number of epochs, but becomes a little unstable; that is, the average episode reward does not monotonically increase over time. May be reducing the clipping range might solve it.
217        for _ in range(self.epochs):shuffle for each epoch
219            indexes = torch.randperm(self.batch_size)for each mini batch
222            for start in range(0, self.batch_size, self.mini_batch_size):get mini batch
224                end = start + self.mini_batch_size
225                mini_batch_indexes = indexes[start: end]
226                mini_batch = {}
227                for k, v in samples.items():
228                    mini_batch[k] = v[mini_batch_indexes]train
231                loss = self._calc_loss(clip_range=clip_range,
232                                       samples=mini_batch)Set learning rate
235                for pg in self.optimizer.param_groups:
236                    pg['lr'] = learning_rateZero out the previously calculated gradients
238                self.optimizer.zero_grad()Calculate gradients
240                loss.backward()Clip gradients
242                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)Update parameters based on gradients
244                self.optimizer.step()246    @staticmethod
247    def _normalize(adv: torch.Tensor):249        return (adv - adv.mean()) / (adv.std() + 1e-8)251    def _calc_loss(self, samples: Dict[str, torch.Tensor], clip_range: float) -> torch.Tensor:$R_t$ returns sampled from $\pi_{\theta_{OLD}}$
257        sampled_return = samples['values'] + samples['advantages']$\bar{A_t} = \frac{\hat{A_t} - \mu(\hat{A_t})}{\sigma(\hat{A_t})}$, where $\hat{A_t}$ is advantages sampled from $\pi_{\theta_{OLD}}$. Refer to sampling function in Main class below for the calculation of $\hat{A}_t$.
263        sampled_normalized_advantage = self._normalize(samples['advantages'])Sampled observations are fed into the model to get $\pi_\theta(a_t|s_t)$ and $V^{\pi_\theta}(s_t)$; we are treating observations as state
267        pi, value = self.model(samples['obs'])$-\log \pi_\theta (a_t|s_t)$, $a_t$ are actions sampled from $\pi_{\theta_{OLD}}$
270        log_pi = pi.log_prob(samples['actions'])Calculate policy loss
273        policy_loss = self.ppo_loss(log_pi, samples['log_pis'], sampled_normalized_advantage, clip_range)Calculate Entropy Bonus
$\mathcal{L}^{EB}(\theta) = \mathbb{E}\Bigl[ S\bigl[\pi_\theta\bigr] (s_t) \Bigr]$
279        entropy_bonus = pi.entropy()
280        entropy_bonus = entropy_bonus.mean()Calculate value function loss
283        value_loss = self.value_loss(value, samples['values'], sampled_return, clip_range)$\mathcal{L}^{CLIP+VF+EB} (\theta) = \mathcal{L}^{CLIP} (\theta) + c_1 \mathcal{L}^{VF} (\theta) - c_2 \mathcal{L}^{EB}(\theta)$
288        loss = policy_loss + 0.5 * value_loss - 0.01 * entropy_bonusfor monitoring
291        approx_kl_divergence = .5 * ((samples['log_pis'] - log_pi) ** 2).mean()Add to tracker
294        tracker.add({'policy_reward': -policy_loss,
295                     'value_loss': value_loss,
296                     'entropy_bonus': entropy_bonus,
297                     'kl_div': approx_kl_divergence,
298                     'clip_fraction': self.ppo_loss.clip_fraction})
299
300        return loss302    def run_training_loop(self):last 100 episode information
308        tracker.set_queue('reward', 100, True)
309        tracker.set_queue('length', 100, True)
310
311        for update in monit.loop(self.updates):
312            progress = update / self.updatesdecreasing learning_rate and clip_range $\epsilon$
315            learning_rate = 2.5e-4 * (1 - progress)
316            clip_range = 0.1 * (1 - progress)sample with current policy
319            samples = self.sample()train the model
322            self.train(samples, learning_rate, clip_range)Save tracked indicators.
325            tracker.save()Add a new line to the screen periodically
327            if (update + 1) % 1_000 == 0:
328                logger.log()330    def destroy(self):335        for worker in self.workers:
336            worker.child.send(("close", None))339def main():Create the experiment
341    experiment.create(name='ppo')Initialize the trainer
343    m = Trainer()Run and monitor the experiment
345    with experiment.start():
346        m.run_training_loop()Stop the workers
348    m.destroy()352if __name__ == "__main__":
353    main()