mirror of
https://github.com/containers/podman.git
synced 2025-09-18 15:54:49 +08:00
Update CI tests to run python docker library against API
* Update reference to docker-py to docker to reflect change in library name * Update tests to create storage sandbox * Enable all tests that endpoints support * Refactor containers/{id}/rename to return 404 not 500 * Refactor tests to use quay.io vs. docker.io Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
38
test/python/docker/README.md
Normal file
38
test/python/docker/README.md
Normal file
@ -0,0 +1,38 @@
|
||||
# Docker regression test
|
||||
|
||||
Python test suite to validate Podman endpoints using docker library (aka docker-py).
|
||||
See [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html).
|
||||
|
||||
## Running Tests
|
||||
|
||||
To run the tests locally in your sandbox (Fedora 32,33):
|
||||
|
||||
```shell
|
||||
# dnf install python3-docker
|
||||
```
|
||||
|
||||
### Run the entire test suite
|
||||
|
||||
```shell
|
||||
# python3 -m unittest discover test/python/docker
|
||||
```
|
||||
|
||||
Passing the -v option to your test script will instruct unittest.main() to enable a higher level of verbosity, and produce detailed output:
|
||||
|
||||
```shell
|
||||
# python3 -m unittest -v discover test/python/docker
|
||||
```
|
||||
|
||||
### Run a specific test class
|
||||
|
||||
```shell
|
||||
# cd test/python/docker
|
||||
# python3 -m unittest -v tests.test_images
|
||||
```
|
||||
|
||||
### Run a specific test within the test class
|
||||
|
||||
```shell
|
||||
# cd test/python/docker
|
||||
# python3 -m unittest tests.test_images.TestImages.test_import_image
|
||||
```
|
157
test/python/docker/__init__.py
Normal file
157
test/python/docker/__init__.py
Normal file
@ -0,0 +1,157 @@
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from test.python.docker import constant
|
||||
|
||||
|
||||
class Podman(object):
|
||||
"""
|
||||
Instances hold the configuration and setup for running podman commands
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize a Podman instance with global options"""
|
||||
binary = os.getenv("PODMAN", "bin/podman")
|
||||
self.cmd = [binary, "--storage-driver=vfs"]
|
||||
|
||||
cgroupfs = os.getenv("CGROUP_MANAGER", "systemd")
|
||||
self.cmd.append(f"--cgroup-manager={cgroupfs}")
|
||||
|
||||
# No support for tmpfs (/tmp) or extfs (/var/tmp)
|
||||
# self.cmd.append("--storage-driver=overlay")
|
||||
|
||||
if os.getenv("DEBUG"):
|
||||
self.cmd.append("--log-level=debug")
|
||||
self.cmd.append("--syslog=true")
|
||||
|
||||
self.anchor_directory = tempfile.mkdtemp(prefix="podman_docker_")
|
||||
|
||||
self.image_cache = os.path.join(self.anchor_directory, "cache")
|
||||
os.makedirs(self.image_cache, exist_ok=True)
|
||||
|
||||
self.cmd.append("--root=" + os.path.join(self.anchor_directory, "crio"))
|
||||
self.cmd.append("--runroot=" + os.path.join(self.anchor_directory, "crio-run"))
|
||||
|
||||
os.environ["REGISTRIES_CONFIG_PATH"] = os.path.join(
|
||||
self.anchor_directory, "registry.conf"
|
||||
)
|
||||
p = configparser.ConfigParser()
|
||||
p.read_dict(
|
||||
{
|
||||
"registries.search": {"registries": "['quay.io', 'docker.io']"},
|
||||
"registries.insecure": {"registries": "[]"},
|
||||
"registries.block": {"registries": "[]"},
|
||||
}
|
||||
)
|
||||
with open(os.environ["REGISTRIES_CONFIG_PATH"], "w") as w:
|
||||
p.write(w)
|
||||
|
||||
os.environ["CNI_CONFIG_PATH"] = os.path.join(
|
||||
self.anchor_directory, "cni", "net.d"
|
||||
)
|
||||
os.makedirs(os.environ["CNI_CONFIG_PATH"], exist_ok=True)
|
||||
self.cmd.append("--cni-config-dir=" + os.environ["CNI_CONFIG_PATH"])
|
||||
cni_cfg = os.path.join(
|
||||
os.environ["CNI_CONFIG_PATH"], "87-podman-bridge.conflist"
|
||||
)
|
||||
# json decoded and encoded to ensure legal json
|
||||
buf = json.loads(
|
||||
"""
|
||||
{
|
||||
"cniVersion": "0.3.0",
|
||||
"name": "podman",
|
||||
"plugins": [{
|
||||
"type": "bridge",
|
||||
"bridge": "cni0",
|
||||
"isGateway": true,
|
||||
"ipMasq": true,
|
||||
"ipam": {
|
||||
"type": "host-local",
|
||||
"subnet": "10.88.0.0/16",
|
||||
"routes": [{
|
||||
"dst": "0.0.0.0/0"
|
||||
}]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "portmap",
|
||||
"capabilities": {
|
||||
"portMappings": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
)
|
||||
with open(cni_cfg, "w") as w:
|
||||
json.dump(buf, w)
|
||||
|
||||
def open(self, command, *args, **kwargs):
|
||||
"""Podman initialized instance to run a given command
|
||||
|
||||
:param self: Podman instance
|
||||
:param command: podman sub-command to run
|
||||
:param args: arguments and options for command
|
||||
:param kwargs: See subprocess.Popen() for shell keyword
|
||||
:return: subprocess.Popen() instance configured to run podman instance
|
||||
"""
|
||||
cmd = self.cmd.copy()
|
||||
cmd.append(command)
|
||||
cmd.extend(args)
|
||||
|
||||
shell = kwargs.get("shell", False)
|
||||
|
||||
return subprocess.Popen(
|
||||
cmd,
|
||||
shell=shell,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
|
||||
def run(self, command, *args, **kwargs):
|
||||
"""Podman initialized instance to run a given command
|
||||
|
||||
:param self: Podman instance
|
||||
:param command: podman sub-command to run
|
||||
:param args: arguments and options for command
|
||||
:param kwargs: See subprocess.Popen() for shell and check keywords
|
||||
:return: subprocess.Popen() instance configured to run podman instance
|
||||
"""
|
||||
cmd = self.cmd.copy()
|
||||
cmd.append(command)
|
||||
cmd.extend(args)
|
||||
|
||||
check = kwargs.get("check", False)
|
||||
shell = kwargs.get("shell", False)
|
||||
|
||||
return subprocess.run(
|
||||
cmd,
|
||||
shell=shell,
|
||||
check=check,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
def tear_down(self):
|
||||
shutil.rmtree(self.anchor_directory, ignore_errors=True)
|
||||
|
||||
def restore_image_from_cache(self, client):
|
||||
img = os.path.join(self.image_cache, constant.ALPINE_TARBALL)
|
||||
if not os.path.exists(img):
|
||||
client.pull(constant.ALPINE)
|
||||
image = client.get_image(constant.ALPINE)
|
||||
with open(img, mode="wb") as tarball:
|
||||
for frame in image:
|
||||
tarball.write(frame)
|
||||
else:
|
||||
self.run("load", "-i", img, check=True)
|
||||
|
||||
def flush_image_cache(self):
|
||||
for f in pathlib.Path(self.image_cache).glob("*.tar"):
|
||||
f.unlink(f)
|
21
test/python/docker/common.py
Normal file
21
test/python/docker/common.py
Normal file
@ -0,0 +1,21 @@
|
||||
from docker import APIClient
|
||||
|
||||
from test.python.docker import constant
|
||||
|
||||
|
||||
def run_top_container(client: APIClient):
|
||||
c = client.create_container(
|
||||
constant.ALPINE, command="top", detach=True, tty=True, name="top"
|
||||
)
|
||||
client.start(c.get("Id"))
|
||||
return c.get("Id")
|
||||
|
||||
|
||||
def remove_all_containers(client: APIClient):
|
||||
for ctnr in client.containers(quiet=True):
|
||||
client.remove_container(ctnr, force=True)
|
||||
|
||||
|
||||
def remove_all_images(client: APIClient):
|
||||
for image in client.images(quiet=True):
|
||||
client.remove_image(image, force=True)
|
6
test/python/docker/constant.py
Normal file
6
test/python/docker/constant.py
Normal file
@ -0,0 +1,6 @@
|
||||
ALPINE = "quay.io/libpod/alpine:latest"
|
||||
ALPINE_SHORTNAME = "alpine"
|
||||
ALPINE_TARBALL = "alpine.tar"
|
||||
BB = "quay.io/libpod/busybox:latest"
|
||||
NGINX = "quay.io/libpod/alpine_nginx:latest"
|
||||
infra = "k8s.gcr.io/pause:3.2"
|
214
test/python/docker/test_containers.py
Normal file
214
test/python/docker/test_containers.py
Normal file
@ -0,0 +1,214 @@
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient, errors
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
|
||||
class TestContainers(unittest.TestCase):
|
||||
podman = None # initialized podman configuration for tests
|
||||
service = None # podman service instance
|
||||
topContainerId = ""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
TestContainers.podman.restore_image_from_cache(self.client)
|
||||
TestContainers.topContainerId = common.run_top_container(self.client)
|
||||
self.assertIsNotNone(TestContainers.topContainerId)
|
||||
|
||||
def tearDown(self):
|
||||
common.remove_all_containers(self.client)
|
||||
common.remove_all_images(self.client)
|
||||
self.client.close()
|
||||
return super().tearDown()
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
TestContainers.podman = Podman()
|
||||
TestContainers.service = TestContainers.podman.open(
|
||||
"system", "service", "tcp:127.0.0.1:8080", "--time=0"
|
||||
)
|
||||
# give the service some time to be ready...
|
||||
time.sleep(2)
|
||||
|
||||
rc = TestContainers.service.poll()
|
||||
if rc is not None:
|
||||
raise subprocess.CalledProcessError(rc, "podman system service")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
TestContainers.service.terminate()
|
||||
stdout, stderr = TestContainers.service.communicate(timeout=0.5)
|
||||
if stdout:
|
||||
sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8"))
|
||||
if stderr:
|
||||
sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8"))
|
||||
|
||||
TestContainers.podman.tear_down()
|
||||
return super().tearDownClass()
|
||||
|
||||
def test_inspect_container(self):
|
||||
# Inspect bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.inspect_container("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Inspect valid container by Id
|
||||
container = self.client.inspect_container(TestContainers.topContainerId)
|
||||
self.assertIn("top", container["Name"])
|
||||
|
||||
# Inspect valid container by name
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertIn(TestContainers.topContainerId, container["Id"])
|
||||
|
||||
def test_create_container(self):
|
||||
# Run a container with detach mode
|
||||
container = self.client.create_container(image="alpine", detach=True)
|
||||
self.assertEqual(len(container), 2)
|
||||
|
||||
def test_start_container(self):
|
||||
# Start bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.start("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Podman docs says it should give a 304 but returns with no response
|
||||
# # Start a already started container should return 304
|
||||
# response = self.client.start(container=TestContainers.topContainerId)
|
||||
# self.assertEqual(error.exception.response.status_code, 304)
|
||||
|
||||
# Create a new container and validate the count
|
||||
self.client.create_container(image=constant.ALPINE, name="container2")
|
||||
containers = self.client.containers(quiet=True, all=True)
|
||||
self.assertEqual(len(containers), 2)
|
||||
|
||||
def test_stop_container(self):
|
||||
# Stop bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.stop("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
|
||||
# Stop a running container and validate the state
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertIn(
|
||||
container["State"]["Status"],
|
||||
"stopped exited",
|
||||
)
|
||||
|
||||
def test_restart_container(self):
|
||||
# Restart bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.restart("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "stopped")
|
||||
|
||||
# restart a running container and validate the state
|
||||
self.client.restart(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
|
||||
def test_remove_container(self):
|
||||
# Remove bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.remove_container("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Remove container by ID with force
|
||||
self.client.remove_container(TestContainers.topContainerId, force=True)
|
||||
containers = self.client.containers()
|
||||
self.assertEqual(len(containers), 0)
|
||||
|
||||
def test_remove_container_without_force(self):
|
||||
# Validate current container count
|
||||
containers = self.client.containers()
|
||||
self.assertTrue(len(containers), 1)
|
||||
|
||||
# Remove running container should throw error
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.remove_container(TestContainers.topContainerId)
|
||||
self.assertEqual(error.exception.response.status_code, 500)
|
||||
|
||||
# Remove container by ID with force
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
self.client.remove_container(TestContainers.topContainerId)
|
||||
containers = self.client.containers()
|
||||
self.assertEqual(len(containers), 0)
|
||||
|
||||
def test_pause_container(self):
|
||||
# Pause bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.pause("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
|
||||
# Pause a running container and validate the state
|
||||
self.client.pause(container["Id"])
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "paused")
|
||||
|
||||
def test_pause_stopped_container(self):
|
||||
# Stop the container
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
|
||||
# Pause exited container should trow error
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.pause(TestContainers.topContainerId)
|
||||
self.assertEqual(error.exception.response.status_code, 500)
|
||||
|
||||
def test_unpause_container(self):
|
||||
# Unpause bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.unpause("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
self.client.pause(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "paused")
|
||||
|
||||
# Pause a running container and validate the state
|
||||
self.client.unpause(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
|
||||
def test_list_container(self):
|
||||
# Add container and validate the count
|
||||
self.client.create_container(image="alpine", detach=True)
|
||||
containers = self.client.containers(all=True)
|
||||
self.assertEqual(len(containers), 2)
|
||||
|
||||
def test_filters(self):
|
||||
self.skipTest("TODO Endpoint does not yet support filters")
|
||||
|
||||
# List container with filter by id
|
||||
filters = {"id": TestContainers.topContainerId}
|
||||
ctnrs = self.client.containers(all=True, filters=filters)
|
||||
self.assertEqual(len(ctnrs), 1)
|
||||
|
||||
# List container with filter by name
|
||||
filters = {"name": "top"}
|
||||
ctnrs = self.client.containers(all=True, filters=filters)
|
||||
self.assertEqual(len(ctnrs), 1)
|
||||
|
||||
def test_rename_container(self):
|
||||
# rename bogus container
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.rename(container="dummy", name="newname")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
169
test/python/docker/test_images.py
Normal file
169
test/python/docker/test_images.py
Normal file
@ -0,0 +1,169 @@
|
||||
import collections
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient, errors
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
|
||||
class TestImages(unittest.TestCase):
|
||||
podman = None # initialized podman configuration for tests
|
||||
service = None # podman service instance
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
|
||||
TestImages.podman.restore_image_from_cache(self.client)
|
||||
|
||||
def tearDown(self):
|
||||
common.remove_all_images(self.client)
|
||||
self.client.close()
|
||||
return super().tearDown()
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
TestImages.podman = Podman()
|
||||
TestImages.service = TestImages.podman.open(
|
||||
"system", "service", "tcp:127.0.0.1:8080", "--time=0"
|
||||
)
|
||||
# give the service some time to be ready...
|
||||
time.sleep(2)
|
||||
|
||||
returncode = TestImages.service.poll()
|
||||
if returncode is not None:
|
||||
raise subprocess.CalledProcessError(returncode, "podman system service")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
TestImages.service.terminate()
|
||||
stdout, stderr = TestImages.service.communicate(timeout=0.5)
|
||||
if stdout:
|
||||
sys.stdout.write("\nImages Service Stdout:\n" + stdout.decode("utf-8"))
|
||||
if stderr:
|
||||
sys.stderr.write("\nImAges Service Stderr:\n" + stderr.decode("utf-8"))
|
||||
|
||||
TestImages.podman.tear_down()
|
||||
return super().tearDownClass()
|
||||
|
||||
def test_inspect_image(self):
|
||||
"""Inspect Image"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.inspect_image("dummy")
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.assertIn(constant.ALPINE, alpine_image["RepoTags"])
|
||||
|
||||
def test_tag_invalid_image(self):
|
||||
"""Tag Image
|
||||
|
||||
Validates if invalid image name is given a bad response is encountered
|
||||
"""
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.tag("dummy", "demo")
|
||||
|
||||
def test_tag_valid_image(self):
|
||||
"""Validates if the image is tagged successfully"""
|
||||
self.client.tag(constant.ALPINE, "demo", constant.ALPINE_SHORTNAME)
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
for x in alpine_image["RepoTags"]:
|
||||
self.assertIn("alpine", x)
|
||||
|
||||
# @unittest.skip("doesn't work now")
|
||||
def test_retag_valid_image(self):
|
||||
"""Validates if name updates when the image is retagged"""
|
||||
self.client.tag(constant.ALPINE_SHORTNAME, "demo", "rename")
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.assertNotIn("demo:test", alpine_image["RepoTags"])
|
||||
|
||||
def test_list_images(self):
|
||||
"""List images"""
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
# Add more images
|
||||
self.client.pull(constant.BB)
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 2)
|
||||
|
||||
# List images with filter
|
||||
filters = {"reference": "alpine"}
|
||||
all_images = self.client.images(filters=filters)
|
||||
self.assertEqual(len(all_images), 1)
|
||||
|
||||
def test_search_image(self):
|
||||
"""Search for image"""
|
||||
response = self.client.search("libpod/alpine")
|
||||
for i in response:
|
||||
self.assertIn("quay.io/libpod/alpine", i["Name"])
|
||||
|
||||
def test_remove_image(self):
|
||||
"""Remove image"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.remove_image("dummy")
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.client.remove_image(alpine_image["Id"])
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 0)
|
||||
|
||||
def test_image_history(self):
|
||||
"""Image history"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.history("dummy")
|
||||
|
||||
# NOTE: history() has incorrect return type hint
|
||||
history = self.client.history(constant.ALPINE)
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
image_id = (
|
||||
alpine_image["Id"][7:]
|
||||
if alpine_image["Id"].startswith("sha256:")
|
||||
else alpine_image["Id"]
|
||||
)
|
||||
|
||||
found = False
|
||||
for change in history:
|
||||
found |= image_id in change.values()
|
||||
self.assertTrue(found, f"image id {image_id} not found in history")
|
||||
|
||||
def test_get_image_exists_not(self):
|
||||
"""Negative test for get image"""
|
||||
with self.assertRaises(errors.NotFound):
|
||||
response = self.client.get_image("image_does_not_exists")
|
||||
collections.deque(response)
|
||||
|
||||
def test_export_image(self):
|
||||
"""Export Image"""
|
||||
self.client.pull(constant.BB)
|
||||
image = self.client.get_image(constant.BB)
|
||||
|
||||
file = os.path.join(TestImages.podman.image_cache, "busybox.tar")
|
||||
with open(file, mode="wb") as tarball:
|
||||
for frame in image:
|
||||
tarball.write(frame)
|
||||
sz = os.path.getsize(file)
|
||||
self.assertGreater(sz, 0)
|
||||
|
||||
def test_import_image(self):
|
||||
"""Import|Load Image"""
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
|
||||
file = os.path.join(TestImages.podman.image_cache, constant.ALPINE_TARBALL)
|
||||
self.client.import_image_from_file(filename=file)
|
||||
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Setup temporary space
|
||||
unittest.main()
|
66
test/python/docker/test_system.py
Normal file
66
test/python/docker/test_system.py
Normal file
@ -0,0 +1,66 @@
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
|
||||
class TestSystem(unittest.TestCase):
|
||||
podman = None # initialized podman configuration for tests
|
||||
service = None # podman service instance
|
||||
topContainerId = ""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
|
||||
TestSystem.podman.restore_image_from_cache(self.client)
|
||||
TestSystem.topContainerId = common.run_top_container(self.client)
|
||||
|
||||
def tearDown(self):
|
||||
common.remove_all_containers(self.client)
|
||||
common.remove_all_images(self.client)
|
||||
self.client.close()
|
||||
return super().tearDown()
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
TestSystem.podman = Podman()
|
||||
TestSystem.service = TestSystem.podman.open(
|
||||
"system", "service", "tcp:127.0.0.1:8080", "--time=0"
|
||||
)
|
||||
# give the service some time to be ready...
|
||||
time.sleep(2)
|
||||
|
||||
returncode = TestSystem.service.poll()
|
||||
if returncode is not None:
|
||||
raise subprocess.CalledProcessError(returncode, "podman system service")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
TestSystem.service.terminate()
|
||||
stdout, stderr = TestSystem.service.communicate(timeout=0.5)
|
||||
if stdout:
|
||||
sys.stdout.write("\nImages Service Stdout:\n" + stdout.decode("utf-8"))
|
||||
if stderr:
|
||||
sys.stderr.write("\nImAges Service Stderr:\n" + stderr.decode("utf-8"))
|
||||
|
||||
TestSystem.podman.tear_down()
|
||||
return super().tearDownClass()
|
||||
|
||||
def test_Info(self):
|
||||
self.assertIsNotNone(self.client.info())
|
||||
|
||||
def test_info_container_details(self):
|
||||
info = self.client.info()
|
||||
self.assertEqual(info["Containers"], 1)
|
||||
self.client.create_container(image=constant.ALPINE)
|
||||
info = self.client.info()
|
||||
self.assertEqual(info["Containers"], 2)
|
||||
|
||||
def test_version(self):
|
||||
self.assertIsNotNone(self.client.version())
|
Reference in New Issue
Block a user