mirror of
https://github.com/containers/podman.git
synced 2025-08-06 03:19:52 +08:00
Refactor to use DockerClient vs APIClient
* Update tests and framework * remove tests for APIClient methods Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
@ -17,9 +17,9 @@
|
||||
|
||||
# -- Project information -----------------------------------------------------
|
||||
|
||||
project = 'Podman'
|
||||
copyright = '2019, team'
|
||||
author = 'team'
|
||||
project = "Podman"
|
||||
copyright = "2019, team"
|
||||
author = "team"
|
||||
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
@ -28,33 +28,33 @@ author = 'team'
|
||||
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
|
||||
# ones.
|
||||
extensions = [
|
||||
'recommonmark',
|
||||
"recommonmark",
|
||||
]
|
||||
|
||||
# Add any paths that contain templates here, relative to this directory.
|
||||
templates_path = ['_templates']
|
||||
templates_path = ["_templates"]
|
||||
|
||||
# List of patterns, relative to source directory, that match files and
|
||||
# directories to ignore when looking for source files.
|
||||
# This pattern also affects html_static_path and html_extra_path.
|
||||
exclude_patterns = []
|
||||
|
||||
master_doc = 'index'
|
||||
master_doc = "index"
|
||||
|
||||
# -- Options for HTML output -------------------------------------------------
|
||||
|
||||
# The theme to use for HTML and HTML Help pages. See the documentation for
|
||||
# a list of builtin themes.
|
||||
#
|
||||
html_theme = 'alabaster'
|
||||
html_theme = "alabaster"
|
||||
|
||||
# Add any paths that contain custom static files (such as style sheets) here,
|
||||
# relative to this directory. They are copied after the builtin static files,
|
||||
# so a file named "default.css" will overwrite the builtin "default.css".
|
||||
html_static_path = ['_static']
|
||||
html_static_path = ["_static"]
|
||||
|
||||
html_css_files = [
|
||||
'custom.css',
|
||||
"custom.css",
|
||||
]
|
||||
|
||||
# -- Extension configuration -------------------------------------------------
|
||||
|
@ -28,17 +28,17 @@ func InspectNetwork(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// FYI scope and version are currently unused but are described by the API
|
||||
// Leaving this for if/when we have to enable these
|
||||
//query := struct {
|
||||
// query := struct {
|
||||
// scope string
|
||||
// verbose bool
|
||||
//}{
|
||||
// }{
|
||||
// // override any golang type defaults
|
||||
//}
|
||||
//decoder := r.Context().Value("decoder").(*schema.Decoder)
|
||||
//if err := decoder.Decode(&query, r.URL.Query()); err != nil {
|
||||
// }
|
||||
// decoder := r.Context().Value("decoder").(*schema.Decoder)
|
||||
// if err := decoder.Decode(&query, r.URL.Query()); err != nil {
|
||||
// utils.Error(w, "Something went wrong.", http.StatusBadRequest, errors.Wrapf(err, "failed to parse parameters for %s", r.URL.String()))
|
||||
// return
|
||||
//}
|
||||
// }
|
||||
config, err := runtime.GetConfig()
|
||||
if err != nil {
|
||||
utils.InternalServerError(w, err)
|
||||
@ -119,7 +119,7 @@ func getNetworkResourceByName(name string, runtime *libpod.Runtime) (*types.Netw
|
||||
}
|
||||
report := types.NetworkResource{
|
||||
Name: name,
|
||||
ID: "",
|
||||
ID: name,
|
||||
Created: time.Unix(int64(stat.Ctim.Sec), int64(stat.Ctim.Nsec)), // nolint: unconvert
|
||||
Scope: "",
|
||||
Driver: network.DefaultNetworkDriver,
|
||||
@ -207,6 +207,7 @@ func ListNetworks(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
reports := make([]*types.NetworkResource, 0, len(netNames))
|
||||
logrus.Errorf("netNames: %q", strings.Join(netNames, ", "))
|
||||
for _, name := range netNames {
|
||||
report, err := getNetworkResourceByName(name, runtime)
|
||||
if err != nil {
|
||||
@ -276,21 +277,14 @@ func CreateNetwork(w http.ResponseWriter, r *http.Request) {
|
||||
utils.InternalServerError(w, err)
|
||||
return
|
||||
}
|
||||
report := types.NetworkCreate{
|
||||
CheckDuplicate: networkCreate.CheckDuplicate,
|
||||
Driver: networkCreate.Driver,
|
||||
Scope: networkCreate.Scope,
|
||||
EnableIPv6: networkCreate.EnableIPv6,
|
||||
IPAM: networkCreate.IPAM,
|
||||
Internal: networkCreate.Internal,
|
||||
Attachable: networkCreate.Attachable,
|
||||
Ingress: networkCreate.Ingress,
|
||||
ConfigOnly: networkCreate.ConfigOnly,
|
||||
ConfigFrom: networkCreate.ConfigFrom,
|
||||
Options: networkCreate.Options,
|
||||
Labels: networkCreate.Labels,
|
||||
|
||||
body := struct {
|
||||
Id string
|
||||
Warning []string
|
||||
}{
|
||||
Id: name,
|
||||
}
|
||||
utils.WriteResponse(w, http.StatusOK, report)
|
||||
utils.WriteResponse(w, http.StatusCreated, body)
|
||||
}
|
||||
|
||||
func RemoveNetwork(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -165,11 +165,34 @@ class TestApi(unittest.TestCase):
|
||||
r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true")))
|
||||
self.assertEqual(r.status_code, 200, r.text)
|
||||
|
||||
def test_post_create(self):
|
||||
self.skipTest("TODO: create request body")
|
||||
r = requests.post(_url("/containers/create?args=True"))
|
||||
self.assertEqual(r.status_code, 200, r.text)
|
||||
json.loads(r.text)
|
||||
def test_post_create_compat(self):
|
||||
"""Create network and container then connect to network"""
|
||||
net = requests.post(
|
||||
PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestNetwork"}
|
||||
)
|
||||
self.assertEqual(net.status_code, 201, net.text)
|
||||
|
||||
create = requests.post(
|
||||
PODMAN_URL + "/v1.40/containers/create?name=postCreate",
|
||||
json={
|
||||
"Cmd": ["date"],
|
||||
"Image": "alpine:latest",
|
||||
"NetworkDisabled": False,
|
||||
"NetworkConfig": {
|
||||
"EndpointConfig": {"TestNetwork": {"Aliases": ["test_post_create"]}}
|
||||
},
|
||||
},
|
||||
)
|
||||
self.assertEqual(create.status_code, 201, create.text)
|
||||
payload = json.loads(create.text)
|
||||
self.assertIsNotNone(payload["Id"])
|
||||
|
||||
connect = requests.post(
|
||||
PODMAN_URL + "/v1.40/networks/TestNetwork/connect",
|
||||
json={"Container": payload["Id"]},
|
||||
)
|
||||
self.assertEqual(connect.status_code, 200, create.text)
|
||||
self.assertEqual(connect.text, "OK\n")
|
||||
|
||||
def test_commit(self):
|
||||
r = requests.post(_url(ctnr("/commit?container={}")))
|
||||
|
@ -6,6 +6,8 @@ import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from docker import DockerClient
|
||||
|
||||
from test.python.docker import constant
|
||||
|
||||
|
||||
@ -141,16 +143,15 @@ class Podman(object):
|
||||
def tear_down(self):
|
||||
shutil.rmtree(self.anchor_directory, ignore_errors=True)
|
||||
|
||||
def restore_image_from_cache(self, client):
|
||||
img = os.path.join(self.image_cache, constant.ALPINE_TARBALL)
|
||||
if not os.path.exists(img):
|
||||
client.pull(constant.ALPINE)
|
||||
image = client.get_image(constant.ALPINE)
|
||||
with open(img, mode="wb") as tarball:
|
||||
for frame in image:
|
||||
def restore_image_from_cache(self, client: DockerClient):
|
||||
path = os.path.join(self.image_cache, constant.ALPINE_TARBALL)
|
||||
if not os.path.exists(path):
|
||||
img = client.images.pull(constant.ALPINE)
|
||||
with open(path, mode="wb") as tarball:
|
||||
for frame in img.save(named=True):
|
||||
tarball.write(frame)
|
||||
else:
|
||||
self.run("load", "-i", img, check=True)
|
||||
self.run("load", "-i", path, check=True)
|
||||
|
||||
def flush_image_cache(self):
|
||||
for f in pathlib.Path(self.image_cache).glob("*.tar"):
|
||||
|
@ -1,21 +1,23 @@
|
||||
from docker import APIClient
|
||||
from docker import DockerClient
|
||||
|
||||
from test.python.docker import constant
|
||||
|
||||
|
||||
def run_top_container(client: APIClient):
|
||||
c = client.create_container(
|
||||
def run_top_container(client: DockerClient):
|
||||
c = client.containers.create(
|
||||
constant.ALPINE, command="top", detach=True, tty=True, name="top"
|
||||
)
|
||||
client.start(c.get("Id"))
|
||||
return c.get("Id")
|
||||
c.start()
|
||||
return c.id
|
||||
|
||||
|
||||
def remove_all_containers(client: APIClient):
|
||||
for ctnr in client.containers(quiet=True):
|
||||
client.remove_container(ctnr, force=True)
|
||||
def remove_all_containers(client: DockerClient):
|
||||
for ctnr in client.containers.list(all=True):
|
||||
ctnr.remove(force=True)
|
||||
|
||||
|
||||
def remove_all_images(client: APIClient):
|
||||
for image in client.images(quiet=True):
|
||||
client.remove_image(image, force=True)
|
||||
def remove_all_images(client: DockerClient):
|
||||
for img in client.images.list():
|
||||
# FIXME should DELETE /images accept the sha256: prefix?
|
||||
id_ = img.id.removeprefix("sha256:")
|
||||
client.images.remove(id_, force=True)
|
||||
|
@ -3,7 +3,7 @@ import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient, errors
|
||||
from docker import DockerClient, errors
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
@ -15,7 +15,7 @@ class TestContainers(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
TestContainers.podman.restore_image_from_cache(self.client)
|
||||
TestContainers.topContainerId = common.run_top_container(self.client)
|
||||
self.assertIsNotNone(TestContainers.topContainerId)
|
||||
@ -52,146 +52,115 @@ class TestContainers(unittest.TestCase):
|
||||
TestContainers.podman.tear_down()
|
||||
return super().tearDownClass()
|
||||
|
||||
def test_inspect_container(self):
|
||||
# Inspect bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.inspect_container("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Inspect valid container by Id
|
||||
container = self.client.inspect_container(TestContainers.topContainerId)
|
||||
self.assertIn("top", container["Name"])
|
||||
|
||||
# Inspect valid container by name
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertIn(TestContainers.topContainerId, container["Id"])
|
||||
|
||||
def test_create_container(self):
|
||||
# Run a container with detach mode
|
||||
container = self.client.create_container(image="alpine", detach=True)
|
||||
self.assertEqual(len(container), 2)
|
||||
self.client.containers.create(image="alpine", detach=True)
|
||||
self.assertEqual(len(self.client.containers.list(all=True)), 2)
|
||||
|
||||
def test_create_network(self):
|
||||
net = self.client.networks.create("testNetwork", driver="bridge")
|
||||
ctnr = self.client.containers.create(image="alpine", detach=True)
|
||||
net.connect(ctnr)
|
||||
|
||||
nets = self.client.networks.list(greedy=True)
|
||||
self.assertGreaterEqual(len(nets), 1)
|
||||
|
||||
# TODO fix endpoint to include containers
|
||||
# for n in nets:
|
||||
# if n.id == "testNetwork":
|
||||
# self.assertEqual(ctnr.id, n.containers)
|
||||
# self.assertTrue(False, "testNetwork not found")
|
||||
|
||||
def test_start_container(self):
|
||||
# Start bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.start("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Podman docs says it should give a 304 but returns with no response
|
||||
# # Start a already started container should return 304
|
||||
# response = self.client.start(container=TestContainers.topContainerId)
|
||||
# response = self.client.api.start(container=TestContainers.topContainerId)
|
||||
# self.assertEqual(error.exception.response.status_code, 304)
|
||||
|
||||
# Create a new container and validate the count
|
||||
self.client.create_container(image=constant.ALPINE, name="container2")
|
||||
containers = self.client.containers(quiet=True, all=True)
|
||||
self.client.containers.create(image=constant.ALPINE, name="container2")
|
||||
containers = self.client.containers.list(all=True)
|
||||
self.assertEqual(len(containers), 2)
|
||||
|
||||
def test_stop_container(self):
|
||||
# Stop bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.stop("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
top = self.client.containers.get("top")
|
||||
self.assertEqual(top.status, "running")
|
||||
|
||||
# Stop a running container and validate the state
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertIn(
|
||||
container["State"]["Status"],
|
||||
"stopped exited",
|
||||
)
|
||||
top.stop()
|
||||
top.reload()
|
||||
self.assertIn(top.status, ("stopped", "exited"))
|
||||
|
||||
def test_restart_container(self):
|
||||
# Restart bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.restart("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "stopped")
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
top.stop()
|
||||
top.reload()
|
||||
self.assertIn(top.status, ("stopped", "exited"))
|
||||
|
||||
# restart a running container and validate the state
|
||||
self.client.restart(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
top.restart()
|
||||
top.reload()
|
||||
self.assertEqual(top.status, "running")
|
||||
|
||||
def test_remove_container(self):
|
||||
# Remove bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.remove_container("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Remove container by ID with force
|
||||
self.client.remove_container(TestContainers.topContainerId, force=True)
|
||||
containers = self.client.containers()
|
||||
self.assertEqual(len(containers), 0)
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
top.remove(force=True)
|
||||
self.assertEqual(len(self.client.containers.list()), 0)
|
||||
|
||||
def test_remove_container_without_force(self):
|
||||
# Validate current container count
|
||||
containers = self.client.containers()
|
||||
self.assertTrue(len(containers), 1)
|
||||
self.assertTrue(len(self.client.containers.list()), 1)
|
||||
|
||||
# Remove running container should throw error
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.remove_container(TestContainers.topContainerId)
|
||||
top.remove()
|
||||
self.assertEqual(error.exception.response.status_code, 500)
|
||||
|
||||
# Remove container by ID with force
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
self.client.remove_container(TestContainers.topContainerId)
|
||||
containers = self.client.containers()
|
||||
self.assertEqual(len(containers), 0)
|
||||
# Remove container by ID without force
|
||||
top.stop()
|
||||
top.remove()
|
||||
self.assertEqual(len(self.client.containers.list()), 0)
|
||||
|
||||
def test_pause_container(self):
|
||||
# Pause bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.pause("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
||||
# Validate the container state
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
self.assertEqual(top.status, "running")
|
||||
|
||||
# Pause a running container and validate the state
|
||||
self.client.pause(container["Id"])
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "paused")
|
||||
top.pause()
|
||||
top.reload()
|
||||
self.assertEqual(top.status, "paused")
|
||||
|
||||
def test_pause_stopped_container(self):
|
||||
# Stop the container
|
||||
self.client.stop(TestContainers.topContainerId)
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
top.stop()
|
||||
|
||||
# Pause exited container should trow error
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.pause(TestContainers.topContainerId)
|
||||
top.pause()
|
||||
self.assertEqual(error.exception.response.status_code, 500)
|
||||
|
||||
def test_unpause_container(self):
|
||||
# Unpause bogus container
|
||||
with self.assertRaises(errors.NotFound) as error:
|
||||
self.client.unpause("dummy")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
|
||||
# Validate the container state
|
||||
self.client.pause(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "paused")
|
||||
top.pause()
|
||||
top.reload()
|
||||
self.assertEqual(top.status, "paused")
|
||||
|
||||
# Pause a running container and validate the state
|
||||
self.client.unpause(TestContainers.topContainerId)
|
||||
container = self.client.inspect_container("top")
|
||||
self.assertEqual(container["State"]["Status"], "running")
|
||||
top.unpause()
|
||||
top.reload()
|
||||
self.assertEqual(top.status, "running")
|
||||
|
||||
def test_list_container(self):
|
||||
# Add container and validate the count
|
||||
self.client.create_container(image="alpine", detach=True)
|
||||
containers = self.client.containers(all=True)
|
||||
self.client.containers.create(image="alpine", detach=True)
|
||||
containers = self.client.containers.list(all=True)
|
||||
self.assertEqual(len(containers), 2)
|
||||
|
||||
def test_filters(self):
|
||||
@ -199,16 +168,18 @@ class TestContainers(unittest.TestCase):
|
||||
|
||||
# List container with filter by id
|
||||
filters = {"id": TestContainers.topContainerId}
|
||||
ctnrs = self.client.containers(all=True, filters=filters)
|
||||
ctnrs = self.client.containers.list(all=True, filters=filters)
|
||||
self.assertEqual(len(ctnrs), 1)
|
||||
|
||||
# List container with filter by name
|
||||
filters = {"name": "top"}
|
||||
ctnrs = self.client.containers(all=True, filters=filters)
|
||||
ctnrs = self.client.containers.list(all=True, filters=filters)
|
||||
self.assertEqual(len(ctnrs), 1)
|
||||
|
||||
def test_rename_container(self):
|
||||
top = self.client.containers.get(TestContainers.topContainerId)
|
||||
|
||||
# rename bogus container
|
||||
with self.assertRaises(errors.APIError) as error:
|
||||
self.client.rename(container="dummy", name="newname")
|
||||
top.rename(name="newname")
|
||||
self.assertEqual(error.exception.response.status_code, 404)
|
||||
|
@ -5,7 +5,7 @@ import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient, errors
|
||||
from docker import DockerClient, errors
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
@ -16,7 +16,7 @@ class TestImages(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
|
||||
TestImages.podman.restore_image_from_cache(self.client)
|
||||
|
||||
@ -51,83 +51,57 @@ class TestImages(unittest.TestCase):
|
||||
TestImages.podman.tear_down()
|
||||
return super().tearDownClass()
|
||||
|
||||
def test_inspect_image(self):
|
||||
"""Inspect Image"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.inspect_image("dummy")
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.assertIn(constant.ALPINE, alpine_image["RepoTags"])
|
||||
|
||||
def test_tag_invalid_image(self):
|
||||
"""Tag Image
|
||||
|
||||
Validates if invalid image name is given a bad response is encountered
|
||||
"""
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.tag("dummy", "demo")
|
||||
|
||||
def test_tag_valid_image(self):
|
||||
"""Validates if the image is tagged successfully"""
|
||||
self.client.tag(constant.ALPINE, "demo", constant.ALPINE_SHORTNAME)
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
for x in alpine_image["RepoTags"]:
|
||||
self.assertIn("alpine", x)
|
||||
alpine = self.client.images.get(constant.ALPINE)
|
||||
self.assertTrue(alpine.tag("demo", constant.ALPINE_SHORTNAME))
|
||||
|
||||
alpine = self.client.images.get(constant.ALPINE)
|
||||
for t in alpine.tags:
|
||||
self.assertIn("alpine", t)
|
||||
|
||||
# @unittest.skip("doesn't work now")
|
||||
def test_retag_valid_image(self):
|
||||
"""Validates if name updates when the image is retagged"""
|
||||
self.client.tag(constant.ALPINE_SHORTNAME, "demo", "rename")
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.assertNotIn("demo:test", alpine_image["RepoTags"])
|
||||
alpine = self.client.images.get(constant.ALPINE)
|
||||
self.assertTrue(alpine.tag("demo", "rename"))
|
||||
|
||||
alpine = self.client.images.get(constant.ALPINE)
|
||||
self.assertNotIn("demo:test", alpine.tags)
|
||||
|
||||
def test_list_images(self):
|
||||
"""List images"""
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
self.assertEqual(len(self.client.images.list()), 1)
|
||||
|
||||
# Add more images
|
||||
self.client.pull(constant.BB)
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 2)
|
||||
self.client.images.pull(constant.BB)
|
||||
self.assertEqual(len(self.client.images.list()), 2)
|
||||
|
||||
# List images with filter
|
||||
filters = {"reference": "alpine"}
|
||||
all_images = self.client.images(filters=filters)
|
||||
self.assertEqual(len(all_images), 1)
|
||||
self.assertEqual(
|
||||
len(self.client.images.list(filters={"reference": "alpine"})), 1
|
||||
)
|
||||
|
||||
def test_search_image(self):
|
||||
"""Search for image"""
|
||||
response = self.client.search("libpod/alpine")
|
||||
for i in response:
|
||||
self.assertIn("quay.io/libpod/alpine", i["Name"])
|
||||
for r in self.client.images.search("libpod/alpine"):
|
||||
self.assertIn("quay.io/libpod/alpine", r["Name"])
|
||||
|
||||
def test_remove_image(self):
|
||||
"""Remove image"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.remove_image("dummy")
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
self.client.images.remove("dummy")
|
||||
self.assertEqual(len(self.client.images.list()), 1)
|
||||
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
self.client.remove_image(alpine_image["Id"])
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 0)
|
||||
self.client.images.remove(constant.ALPINE)
|
||||
self.assertEqual(len(self.client.images.list()), 0)
|
||||
|
||||
def test_image_history(self):
|
||||
"""Image history"""
|
||||
# Check for error with wrong image name
|
||||
with self.assertRaises(errors.NotFound):
|
||||
self.client.history("dummy")
|
||||
|
||||
# NOTE: history() has incorrect return type hint
|
||||
history = self.client.history(constant.ALPINE)
|
||||
alpine_image = self.client.inspect_image(constant.ALPINE)
|
||||
image_id = (
|
||||
alpine_image["Id"][7:]
|
||||
if alpine_image["Id"].startswith("sha256:")
|
||||
else alpine_image["Id"]
|
||||
)
|
||||
img = self.client.images.get(constant.ALPINE)
|
||||
history = img.history()
|
||||
image_id = img.id[7:] if img.id.startswith("sha256:") else img.id
|
||||
|
||||
found = False
|
||||
for change in history:
|
||||
@ -137,31 +111,34 @@ class TestImages(unittest.TestCase):
|
||||
def test_get_image_exists_not(self):
|
||||
"""Negative test for get image"""
|
||||
with self.assertRaises(errors.NotFound):
|
||||
response = self.client.get_image("image_does_not_exists")
|
||||
response = self.client.images.get("image_does_not_exists")
|
||||
collections.deque(response)
|
||||
|
||||
def test_export_image(self):
|
||||
def test_save_image(self):
|
||||
"""Export Image"""
|
||||
self.client.pull(constant.BB)
|
||||
image = self.client.get_image(constant.BB)
|
||||
image = self.client.images.pull(constant.BB)
|
||||
|
||||
file = os.path.join(TestImages.podman.image_cache, "busybox.tar")
|
||||
with open(file, mode="wb") as tarball:
|
||||
for frame in image:
|
||||
for frame in image.save(named=True):
|
||||
tarball.write(frame)
|
||||
sz = os.path.getsize(file)
|
||||
self.assertGreater(sz, 0)
|
||||
|
||||
def test_import_image(self):
|
||||
def test_load_image(self):
|
||||
"""Import|Load Image"""
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 1)
|
||||
self.assertEqual(len(self.client.images.list()), 1)
|
||||
|
||||
file = os.path.join(TestImages.podman.image_cache, constant.ALPINE_TARBALL)
|
||||
self.client.import_image_from_file(filename=file)
|
||||
image = self.client.images.pull(constant.BB)
|
||||
file = os.path.join(TestImages.podman.image_cache, "busybox.tar")
|
||||
with open(file, mode="wb") as tarball:
|
||||
for frame in image.save():
|
||||
tarball.write(frame)
|
||||
|
||||
all_images = self.client.images()
|
||||
self.assertEqual(len(all_images), 2)
|
||||
with open(file, mode="rb") as saved:
|
||||
_ = self.client.images.load(saved)
|
||||
|
||||
self.assertEqual(len(self.client.images.list()), 2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -3,7 +3,7 @@ import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from docker import APIClient
|
||||
from docker import DockerClient
|
||||
|
||||
from test.python.docker import Podman, common, constant
|
||||
|
||||
@ -15,7 +15,7 @@ class TestSystem(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.client = APIClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
|
||||
|
||||
TestSystem.podman.restore_image_from_cache(self.client)
|
||||
TestSystem.topContainerId = common.run_top_container(self.client)
|
||||
@ -58,9 +58,10 @@ class TestSystem(unittest.TestCase):
|
||||
def test_info_container_details(self):
|
||||
info = self.client.info()
|
||||
self.assertEqual(info["Containers"], 1)
|
||||
self.client.create_container(image=constant.ALPINE)
|
||||
self.client.containers.create(image=constant.ALPINE)
|
||||
info = self.client.info()
|
||||
self.assertEqual(info["Containers"], 2)
|
||||
|
||||
def test_version(self):
|
||||
self.assertIsNotNone(self.client.version())
|
||||
version = self.client.version()
|
||||
self.assertIsNotNone(version["Platform"]["Name"])
|
||||
|
Reference in New Issue
Block a user