mirror of
				https://github.com/containers/podman.git
				synced 2025-10-26 18:54:17 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			198 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			198 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import subprocess
 | |
| import sys
 | |
| import time
 | |
| import unittest
 | |
| 
 | |
| from docker import DockerClient, errors
 | |
| 
 | |
| from test.python.docker import Podman, common, constant
 | |
| 
 | |
| 
 | |
| class TestContainers(unittest.TestCase):
 | |
|     podman = None  # initialized podman configuration for tests
 | |
|     service = None  # podman service instance
 | |
|     topContainerId = ""
 | |
| 
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
 | |
|         TestContainers.podman.restore_image_from_cache(self.client)
 | |
|         TestContainers.topContainerId = common.run_top_container(self.client)
 | |
|         self.assertIsNotNone(TestContainers.topContainerId)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         common.remove_all_containers(self.client)
 | |
|         common.remove_all_images(self.client)
 | |
|         self.client.close()
 | |
|         return super().tearDown()
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super().setUpClass()
 | |
|         TestContainers.podman = Podman()
 | |
|         TestContainers.service = TestContainers.podman.open(
 | |
|             "system", "service", "tcp:127.0.0.1:8080", "--time=0"
 | |
|         )
 | |
|         # give the service some time to be ready...
 | |
|         time.sleep(2)
 | |
| 
 | |
|         rc = TestContainers.service.poll()
 | |
|         if rc is not None:
 | |
|             raise subprocess.CalledProcessError(rc, "podman system service")
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         TestContainers.service.terminate()
 | |
|         stdout, stderr = TestContainers.service.communicate(timeout=0.5)
 | |
|         if stdout:
 | |
|             sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8"))
 | |
|         if stderr:
 | |
|             sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8"))
 | |
| 
 | |
|         TestContainers.podman.tear_down()
 | |
|         return super().tearDownClass()
 | |
| 
 | |
|     def test_create_container(self):
 | |
|         # Run a container with detach mode
 | |
|         self.client.containers.create(image="alpine", detach=True)
 | |
|         self.assertEqual(len(self.client.containers.list(all=True)), 2)
 | |
| 
 | |
|     def test_create_network(self):
 | |
|         net = self.client.networks.create("testNetwork", driver="bridge")
 | |
|         ctnr = self.client.containers.create(image="alpine", detach=True)
 | |
| 
 | |
|         #  TODO fix when ready
 | |
|         # This test will not work until all connect|disconnect
 | |
|         # code is fixed.
 | |
|         # net.connect(ctnr)
 | |
| 
 | |
|         # nets = self.client.networks.list(greedy=True)
 | |
|         # self.assertGreaterEqual(len(nets), 1)
 | |
| 
 | |
|         # TODO fix endpoint to include containers
 | |
|         # for n in nets:
 | |
|         #     if n.id == "testNetwork":
 | |
|         #         self.assertEqual(ctnr.id, n.containers)
 | |
|         # self.assertTrue(False, "testNetwork not found")
 | |
| 
 | |
|     def test_start_container(self):
 | |
|         # Podman docs says it should give a 304 but returns with no response
 | |
|         # # Start a already started container should return 304
 | |
|         # response = self.client.api.start(container=TestContainers.topContainerId)
 | |
|         # self.assertEqual(error.exception.response.status_code, 304)
 | |
| 
 | |
|         # Create a new container and validate the count
 | |
|         self.client.containers.create(image=constant.ALPINE, name="container2")
 | |
|         containers = self.client.containers.list(all=True)
 | |
|         self.assertEqual(len(containers), 2)
 | |
| 
 | |
|     def test_start_container_with_random_port_bind(self):
 | |
|         container = self.client.containers.create(image=constant.ALPINE,
 | |
|                                                   name="containerWithRandomBind",
 | |
|                                                   ports={'1234/tcp': None})
 | |
|         containers = self.client.containers.list(all=True)
 | |
|         self.assertTrue(container in containers)
 | |
| 
 | |
|     def test_stop_container(self):
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         self.assertEqual(top.status, "running")
 | |
| 
 | |
|         # Stop a running container and validate the state
 | |
|         top.stop()
 | |
|         top.reload()
 | |
|         self.assertIn(top.status, ("stopped", "exited"))
 | |
| 
 | |
|     def test_kill_container(self):
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         self.assertEqual(top.status, "running")
 | |
| 
 | |
|         # Kill a running container and validate the state
 | |
|         top.kill()
 | |
|         top.reload()
 | |
|         self.assertIn(top.status, ("stopped", "exited"))
 | |
| 
 | |
|     def test_restart_container(self):
 | |
|         # Validate the container state
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         top.stop()
 | |
|         top.reload()
 | |
|         self.assertIn(top.status, ("stopped", "exited"))
 | |
| 
 | |
|         # restart a running container and validate the state
 | |
|         top.restart()
 | |
|         top.reload()
 | |
|         self.assertEqual(top.status, "running")
 | |
| 
 | |
|     def test_remove_container(self):
 | |
|         # Remove container by ID with force
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         top.remove(force=True)
 | |
|         self.assertEqual(len(self.client.containers.list()), 0)
 | |
| 
 | |
|     def test_remove_container_without_force(self):
 | |
|         # Validate current container count
 | |
|         self.assertTrue(len(self.client.containers.list()), 1)
 | |
| 
 | |
|         # Remove running container should throw error
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         with self.assertRaises(errors.APIError) as error:
 | |
|             top.remove()
 | |
|         self.assertEqual(error.exception.response.status_code, 500)
 | |
| 
 | |
|         # Remove container by ID without force
 | |
|         top.stop()
 | |
|         top.remove()
 | |
|         self.assertEqual(len(self.client.containers.list()), 0)
 | |
| 
 | |
|     def test_pause_container(self):
 | |
|         # Validate the container state
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         self.assertEqual(top.status, "running")
 | |
| 
 | |
|         # Pause a running container and validate the state
 | |
|         top.pause()
 | |
|         top.reload()
 | |
|         self.assertEqual(top.status, "paused")
 | |
| 
 | |
|     def test_pause_stopped_container(self):
 | |
|         # Stop the container
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
|         top.stop()
 | |
| 
 | |
|         # Pause exited container should throw error
 | |
|         with self.assertRaises(errors.APIError) as error:
 | |
|             top.pause()
 | |
|         self.assertEqual(error.exception.response.status_code, 500)
 | |
| 
 | |
|     def test_unpause_container(self):
 | |
|         top = self.client.containers.get(TestContainers.topContainerId)
 | |
| 
 | |
|         # Validate the container state
 | |
|         top.pause()
 | |
|         top.reload()
 | |
|         self.assertEqual(top.status, "paused")
 | |
| 
 | |
|         # Pause a running container and validate the state
 | |
|         top.unpause()
 | |
|         top.reload()
 | |
|         self.assertEqual(top.status, "running")
 | |
| 
 | |
|     def test_list_container(self):
 | |
|         # Add container and validate the count
 | |
|         self.client.containers.create(image="alpine", detach=True)
 | |
|         containers = self.client.containers.list(all=True)
 | |
|         self.assertEqual(len(containers), 2)
 | |
| 
 | |
|     def test_filters(self):
 | |
|         self.skipTest("TODO Endpoint does not yet support filters")
 | |
| 
 | |
|         # List container with filter by id
 | |
|         filters = {"id": TestContainers.topContainerId}
 | |
|         ctnrs = self.client.containers.list(all=True, filters=filters)
 | |
|         self.assertEqual(len(ctnrs), 1)
 | |
| 
 | |
|         # List container with filter by name
 | |
|         filters = {"name": "top"}
 | |
|         ctnrs = self.client.containers.list(all=True, filters=filters)
 | |
|         self.assertEqual(len(ctnrs), 1)
 | 
