mirror of
				https://github.com/containers/podman.git
				synced 2025-10-31 18:08:51 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			171 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			171 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import collections
 | |
| import io
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import time
 | |
| import unittest
 | |
| 
 | |
| from docker import DockerClient, errors
 | |
| from docker.errors import APIError
 | |
| 
 | |
| from test.python.docker import Podman
 | |
| from test.python.docker.compat import common, constant
 | |
| 
 | |
| 
 | |
| class TestImages(unittest.TestCase):
 | |
|     podman = None  # initialized podman configuration for tests
 | |
|     service = None  # podman service instance
 | |
| 
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
 | |
| 
 | |
|         TestImages.podman.restore_image_from_cache(self.client)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         common.remove_all_images(self.client)
 | |
|         self.client.close()
 | |
|         return super().tearDown()
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         super().setUpClass()
 | |
|         TestImages.podman = Podman()
 | |
|         TestImages.service = TestImages.podman.open(
 | |
|             "system", "service", "tcp:127.0.0.1:8080", "--time=0"
 | |
|         )
 | |
|         # give the service some time to be ready...
 | |
|         time.sleep(2)
 | |
| 
 | |
|         returncode = TestImages.service.poll()
 | |
|         if returncode is not None:
 | |
|             raise subprocess.CalledProcessError(returncode, "podman system service")
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         TestImages.service.terminate()
 | |
|         stdout, stderr = TestImages.service.communicate(timeout=0.5)
 | |
|         if stdout:
 | |
|             sys.stdout.write("\nImages Service Stdout:\n" + stdout.decode("utf-8"))
 | |
|         if stderr:
 | |
|             sys.stderr.write("\nImAges Service Stderr:\n" + stderr.decode("utf-8"))
 | |
| 
 | |
|         TestImages.podman.tear_down()
 | |
|         return super().tearDownClass()
 | |
| 
 | |
|     def test_tag_valid_image(self):
 | |
|         """Validates if the image is tagged successfully"""
 | |
|         alpine = self.client.images.get(constant.ALPINE)
 | |
|         self.assertTrue(alpine.tag("demo", constant.ALPINE_SHORTNAME))
 | |
| 
 | |
|         alpine = self.client.images.get(constant.ALPINE)
 | |
|         for t in alpine.tags:
 | |
|             self.assertIn("alpine", t)
 | |
| 
 | |
|     # @unittest.skip("doesn't work now")
 | |
|     def test_retag_valid_image(self):
 | |
|         """Validates if name updates when the image is retagged"""
 | |
|         alpine = self.client.images.get(constant.ALPINE)
 | |
|         self.assertTrue(alpine.tag("demo", "rename"))
 | |
| 
 | |
|         alpine = self.client.images.get(constant.ALPINE)
 | |
|         self.assertNotIn("demo:test", alpine.tags)
 | |
| 
 | |
|     def test_list_images(self):
 | |
|         """List images"""
 | |
|         self.assertEqual(len(self.client.images.list()), 1)
 | |
| 
 | |
|         # Add more images
 | |
|         self.client.images.pull(constant.BB)
 | |
|         self.assertEqual(len(self.client.images.list()), 2)
 | |
| 
 | |
|         # List images with filter
 | |
|         self.assertEqual(len(self.client.images.list(filters={"reference": "alpine"})), 1)
 | |
| 
 | |
|     def test_search_image(self):
 | |
|         """Search for image"""
 | |
|         for r in self.client.images.search("alpine"):
 | |
|             self.assertIn("alpine", r["Name"])
 | |
| 
 | |
|     def test_search_bogus_image(self):
 | |
|         """Search for bogus image should throw exception"""
 | |
|         try:
 | |
|             r = self.client.images.search("bogus/bogus")
 | |
|         except:
 | |
|             return
 | |
|         self.assertTrue(len(r) == 0)
 | |
| 
 | |
|     def test_remove_image(self):
 | |
|         """Remove image"""
 | |
|         # Check for error with wrong image name
 | |
|         with self.assertRaises(errors.NotFound):
 | |
|             self.client.images.remove("dummy")
 | |
|         self.assertEqual(len(self.client.images.list()), 1)
 | |
| 
 | |
|         self.client.images.remove(constant.ALPINE)
 | |
|         self.assertEqual(len(self.client.images.list()), 0)
 | |
| 
 | |
|     def test_image_history(self):
 | |
|         """Image history"""
 | |
|         img = self.client.images.get(constant.ALPINE)
 | |
|         history = img.history()
 | |
|         image_id = img.id[7:] if img.id.startswith("sha256:") else img.id
 | |
| 
 | |
|         found = False
 | |
|         for change in history:
 | |
|             found |= image_id in change.values()
 | |
|         self.assertTrue(found, f"image id {image_id} not found in history")
 | |
| 
 | |
|     def test_get_image_exists_not(self):
 | |
|         """Negative test for get image"""
 | |
|         with self.assertRaises(errors.NotFound):
 | |
|             response = self.client.images.get("image_does_not_exists")
 | |
|             collections.deque(response)
 | |
| 
 | |
|     def test_save_image(self):
 | |
|         """Export Image"""
 | |
|         image = self.client.images.pull(constant.BB)
 | |
| 
 | |
|         file = os.path.join(TestImages.podman.image_cache, "busybox.tar")
 | |
|         with open(file, mode="wb") as tarball:
 | |
|             for frame in image.save(named=True):
 | |
|                 tarball.write(frame)
 | |
|         sz = os.path.getsize(file)
 | |
|         self.assertGreater(sz, 0)
 | |
| 
 | |
|     def test_load_image(self):
 | |
|         """Import|Load Image"""
 | |
|         self.assertEqual(len(self.client.images.list()), 1)
 | |
| 
 | |
|         image = self.client.images.pull(constant.BB)
 | |
|         file = os.path.join(TestImages.podman.image_cache, "busybox.tar")
 | |
|         with open(file, mode="wb") as tarball:
 | |
|             for frame in image.save():
 | |
|                 tarball.write(frame)
 | |
| 
 | |
|         with open(file, mode="rb") as saved:
 | |
|             _ = self.client.images.load(saved)
 | |
| 
 | |
|         self.assertEqual(len(self.client.images.list()), 2)
 | |
| 
 | |
|     def test_load_corrupt_image(self):
 | |
|         """Import|Load Image failure"""
 | |
|         tarball = io.BytesIO("This is a corrupt tarball".encode("utf-8"))
 | |
|         with self.assertRaises(APIError):
 | |
|             self.client.images.load(tarball)
 | |
| 
 | |
|     def test_build_image(self):
 | |
|         labels = {"apple": "red", "grape": "green"}
 | |
|         _ = self.client.images.build(
 | |
|             path="test/python/docker/build_labels", labels=labels, tag="labels", isolation="default"
 | |
|         )
 | |
|         image = self.client.images.get("labels")
 | |
|         self.assertEqual(image.labels["apple"], labels["apple"])
 | |
|         self.assertEqual(image.labels["grape"], labels["grape"])
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     # Setup temporary space
 | |
|     unittest.main()
 | 
