9import multiprocessing
10import multiprocessing.connection
11
12import cv2
13import gym
14import numpy as npThis is a wrapper for OpenAI gym game environment. We do a few things here:
Observation is tensor of size (4, 84, 84). It is four frames (images of the game screen) stacked on first axis. i.e, each channel is a frame.
17class Game:35    def __init__(self, seed: int):create environment
37        self.env = gym.make('BreakoutNoFrameskip-v4')
38        self.env.seed(seed)tensor for a stack of 4 frames
41        self.obs_4 = np.zeros((4, 84, 84))buffer to keep the maximum of last 2 frames
44        self.obs_2_max = np.zeros((2, 84, 84))keep track of the episode rewards
47        self.rewards = []and number of lives left
49        self.lives = 0Executes action for 4 time steps and
 returns a tuple of (observation, reward, done, episode_info).
observation: stacked 4 frames (this frame and frames for last 3 actions)reward: total reward while the action was executeddone: whether the episode finished (a life lost)episode_info: episode information if completed51    def step(self, action):63        reward = 0.
64        done = Nonerun for 4 steps
67        for i in range(4):execute the action in the OpenAI Gym environment
69            obs, r, done, info = self.env.step(action)
70
71            if i >= 2:
72                self.obs_2_max[i % 2] = self._process_obs(obs)
73
74            reward += rget number of lives left
77            lives = self.env.unwrapped.ale.lives()reset if a life is lost
79            if lives < self.lives:
80                done = True
81                breakmaintain rewards for each step
84        self.rewards.append(reward)
85
86        if done:if finished, set episode information if episode is over, and reset
88            episode_info = {"reward": sum(self.rewards), "length": len(self.rewards)}
89            self.reset()
90        else:
91            episode_info = Noneget the max of last two frames
94            obs = self.obs_2_max.max(axis=0)push it to the stack of 4 frames
97            self.obs_4 = np.roll(self.obs_4, shift=-1, axis=0)
98            self.obs_4[-1] = obs
99
100        return self.obs_4, reward, done, episode_info102    def reset(self):reset OpenAI Gym environment
109        obs = self.env.reset()reset caches
112        obs = self._process_obs(obs)
113        for i in range(4):
114            self.obs_4[i] = obs
115        self.rewards = []
116
117        self.lives = self.env.unwrapped.ale.lives()
118
119        return self.obs_4121    @staticmethod
122    def _process_obs(obs):127        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
128        obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
129        return obs132def worker_process(remote: multiprocessing.connection.Connection, seed: int):create game
140    game = Game(seed)wait for instructions from the connection and execute them
143    while True:
144        cmd, data = remote.recv()
145        if cmd == "step":
146            remote.send(game.step(data))
147        elif cmd == "reset":
148            remote.send(game.reset())
149        elif cmd == "close":
150            remote.close()
151            break
152        else:
153            raise NotImplementedErrorCreates a new worker and runs it in a separate process.
156class Worker:161    def __init__(self, seed):
162        self.child, parent = multiprocessing.Pipe()
163        self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
164        self.process.start()