This experiment trains Proximal Policy Optimization (PPO) agent Atari Breakout game on OpenAI Gym. It runs the game environments on multiple processes to sample efficiently.
13from typing import Dict
14
15import numpy as np
16import torch
17from torch import nn
18from torch import optim
19from torch.distributions import Categorical
20
21from labml import monit, tracker, logger, experiment
22from labml_helpers.module import Module
23from labml_nn.rl.game import Worker
24from labml_nn.rl.ppo import ClippedPPOLoss, ClippedValueFunctionLoss
25from labml_nn.rl.ppo.gae import GAE
Select device
28if torch.cuda.is_available():
29 device = torch.device("cuda:0")
30else:
31 device = torch.device("cpu")
34class Model(Module):
39 def __init__(self):
40 super().__init__()
The first convolution layer takes a 84x84 frame and produces a 20x20 frame
44 self.conv1 = nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8, stride=4)
The second convolution layer takes a 20x20 frame and produces a 9x9 frame
48 self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
The third convolution layer takes a 9x9 frame and produces a 7x7 frame
52 self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
A fully connected layer takes the flattened frame from third convolution layer, and outputs 512 features
57 self.lin = nn.Linear(in_features=7 * 7 * 64, out_features=512)
A fully connected layer to get logits for $\pi$
60 self.pi_logits = nn.Linear(in_features=512, out_features=4)
A fully connected layer to get value function
63 self.value = nn.Linear(in_features=512, out_features=1)
66 self.activation = nn.ReLU()
68 def __call__(self, obs: torch.Tensor):
69 h = self.activation(self.conv1(obs))
70 h = self.activation(self.conv2(h))
71 h = self.activation(self.conv3(h))
72 h = h.reshape((-1, 7 * 7 * 64))
73
74 h = self.activation(self.lin(h))
75
76 pi = Categorical(logits=self.pi_logits(h))
77 value = self.value(h).reshape(-1)
78
79 return pi, value
Scale observations from [0, 255]
to [0, 1]
82def obs_to_torch(obs: np.ndarray) -> torch.Tensor:
84 return torch.tensor(obs, dtype=torch.float32, device=device) / 255.
87class Trainer:
92 def __init__(self):
number of updates
96 self.updates = 10000
number of epochs to train the model with sampled data
98 self.epochs = 4
number of worker processes
100 self.n_workers = 8
number of steps to run on each process for a single update
102 self.worker_steps = 128
number of mini batches
104 self.n_mini_batch = 4
total number of samples for a single update
106 self.batch_size = self.n_workers * self.worker_steps
size of a mini batch
108 self.mini_batch_size = self.batch_size // self.n_mini_batch
109 assert (self.batch_size % self.n_mini_batch == 0)
create workers
114 self.workers = [Worker(47 + i) for i in range(self.n_workers)]
initialize tensors for observations
117 self.obs = np.zeros((self.n_workers, 4, 84, 84), dtype=np.uint8)
118 for worker in self.workers:
119 worker.child.send(("reset", None))
120 for i, worker in enumerate(self.workers):
121 self.obs[i] = worker.child.recv()
model
124 self.model = Model().to(device)
optimizer
127 self.optimizer = optim.Adam(self.model.parameters(), lr=2.5e-4)
GAE with $\gamma = 0.99$ and $\lambda = 0.95$
130 self.gae = GAE(self.n_workers, self.worker_steps, 0.99, 0.95)
PPO Loss
133 self.ppo_loss = ClippedPPOLoss()
Value Loss
136 self.value_loss = ClippedValueFunctionLoss()
138 def sample(self) -> Dict[str, torch.Tensor]:
143 rewards = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
144 actions = np.zeros((self.n_workers, self.worker_steps), dtype=np.int32)
145 done = np.zeros((self.n_workers, self.worker_steps), dtype=np.bool)
146 obs = np.zeros((self.n_workers, self.worker_steps, 4, 84, 84), dtype=np.uint8)
147 log_pis = np.zeros((self.n_workers, self.worker_steps), dtype=np.float32)
148 values = np.zeros((self.n_workers, self.worker_steps + 1), dtype=np.float32)
149
150 with torch.no_grad():
sample worker_steps
from each worker
152 for t in range(self.worker_steps):
self.obs
keeps track of the last observation from each worker,
which is the input for the model to sample the next action
155 obs[:, t] = self.obs
sample actions from $\pi_{\theta_{OLD}}$ for each worker;
this returns arrays of size n_workers
158 pi, v = self.model(obs_to_torch(self.obs))
159 values[:, t] = v.cpu().numpy()
160 a = pi.sample()
161 actions[:, t] = a.cpu().numpy()
162 log_pis[:, t] = pi.log_prob(a).cpu().numpy()
run sampled actions on each worker
165 for w, worker in enumerate(self.workers):
166 worker.child.send(("step", actions[w, t]))
167
168 for w, worker in enumerate(self.workers):
get results after executing the actions
170 self.obs[w], rewards[w, t], done[w, t], info = worker.child.recv()
collect episode info, which is available if an episode finished;
this includes total reward and length of the episode -
look at Game
to see how it works.
175 if info:
176 tracker.add('reward', info['reward'])
177 tracker.add('length', info['length'])
Get value of after the final step
180 _, v = self.model(obs_to_torch(self.obs))
181 values[:, self.worker_steps] = v.cpu().numpy()
calculate advantages
184 advantages = self.gae(done, rewards, values)
187 samples = {
188 'obs': obs,
189 'actions': actions,
190 'values': values[:, :-1],
191 'log_pis': log_pis,
192 'advantages': advantages
193 }
samples are currently in [workers, time_step]
table,
we should flatten it for training
197 samples_flat = {}
198 for k, v in samples.items():
199 v = v.reshape(v.shape[0] * v.shape[1], *v.shape[2:])
200 if k == 'obs':
201 samples_flat[k] = obs_to_torch(v)
202 else:
203 samples_flat[k] = torch.tensor(v, device=device)
204
205 return samples_flat
207 def train(self, samples: Dict[str, torch.Tensor], learning_rate: float, clip_range: float):
It learns faster with a higher number of epochs, but becomes a little unstable; that is, the average episode reward does not monotonically increase over time. May be reducing the clipping range might solve it.
217 for _ in range(self.epochs):
shuffle for each epoch
219 indexes = torch.randperm(self.batch_size)
for each mini batch
222 for start in range(0, self.batch_size, self.mini_batch_size):
get mini batch
224 end = start + self.mini_batch_size
225 mini_batch_indexes = indexes[start: end]
226 mini_batch = {}
227 for k, v in samples.items():
228 mini_batch[k] = v[mini_batch_indexes]
train
231 loss = self._calc_loss(clip_range=clip_range,
232 samples=mini_batch)
Set learning rate
235 for pg in self.optimizer.param_groups:
236 pg['lr'] = learning_rate
Zero out the previously calculated gradients
238 self.optimizer.zero_grad()
Calculate gradients
240 loss.backward()
Clip gradients
242 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
Update parameters based on gradients
244 self.optimizer.step()
246 @staticmethod
247 def _normalize(adv: torch.Tensor):
249 return (adv - adv.mean()) / (adv.std() + 1e-8)
251 def _calc_loss(self, samples: Dict[str, torch.Tensor], clip_range: float) -> torch.Tensor:
$R_t$ returns sampled from $\pi_{\theta_{OLD}}$
257 sampled_return = samples['values'] + samples['advantages']
$\bar{A_t} = \frac{\hat{A_t} - \mu(\hat{A_t})}{\sigma(\hat{A_t})}$, where $\hat{A_t}$ is advantages sampled from $\pi_{\theta_{OLD}}$. Refer to sampling function in Main class below for the calculation of $\hat{A}_t$.
263 sampled_normalized_advantage = self._normalize(samples['advantages'])
Sampled observations are fed into the model to get $\pi_\theta(a_t|s_t)$ and $V^{\pi_\theta}(s_t)$; we are treating observations as state
267 pi, value = self.model(samples['obs'])
$-\log \pi_\theta (a_t|s_t)$, $a_t$ are actions sampled from $\pi_{\theta_{OLD}}$
270 log_pi = pi.log_prob(samples['actions'])
Calculate policy loss
273 policy_loss = self.ppo_loss(log_pi, samples['log_pis'], sampled_normalized_advantage, clip_range)
Calculate Entropy Bonus
$\mathcal{L}^{EB}(\theta) = \mathbb{E}\Bigl[ S\bigl[\pi_\theta\bigr] (s_t) \Bigr]$
279 entropy_bonus = pi.entropy()
280 entropy_bonus = entropy_bonus.mean()
Calculate value function loss
283 value_loss = self.value_loss(value, samples['values'], sampled_return, clip_range)
$\mathcal{L}^{CLIP+VF+EB} (\theta) = \mathcal{L}^{CLIP} (\theta) + c_1 \mathcal{L}^{VF} (\theta) - c_2 \mathcal{L}^{EB}(\theta)$
288 loss = policy_loss + 0.5 * value_loss - 0.01 * entropy_bonus
for monitoring
291 approx_kl_divergence = .5 * ((samples['log_pis'] - log_pi) ** 2).mean()
Add to tracker
294 tracker.add({'policy_reward': -policy_loss,
295 'value_loss': value_loss,
296 'entropy_bonus': entropy_bonus,
297 'kl_div': approx_kl_divergence,
298 'clip_fraction': self.ppo_loss.clip_fraction})
299
300 return loss
302 def run_training_loop(self):
last 100 episode information
308 tracker.set_queue('reward', 100, True)
309 tracker.set_queue('length', 100, True)
310
311 for update in monit.loop(self.updates):
312 progress = update / self.updates
decreasing learning_rate
and clip_range
$\epsilon$
315 learning_rate = 2.5e-4 * (1 - progress)
316 clip_range = 0.1 * (1 - progress)
sample with current policy
319 samples = self.sample()
train the model
322 self.train(samples, learning_rate, clip_range)
Save tracked indicators.
325 tracker.save()
Add a new line to the screen periodically
327 if (update + 1) % 1_000 == 0:
328 logger.log()
330 def destroy(self):
335 for worker in self.workers:
336 worker.child.send(("close", None))
339def main():
Create the experiment
341 experiment.create(name='ppo')
Initialize the trainer
343 m = Trainer()
Run and monitor the experiment
345 with experiment.start():
346 m.run_training_loop()
Stop the workers
348 m.destroy()
352if __name__ == "__main__":
353 main()