Break up python APIv2 tests

* Tests broken up into areas of concern
* Introduced fixtures to reduce duplicated code
* Introduced new assert methods with APITestCase
* General cleanup of code while visiting
* Tests now targeting quay.io

Known issues:
* is-official against quay.io not working

Fixes: #9238
Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
Jhon Honce
2021-05-17 17:11:50 -07:00
parent 93c3e03227
commit 98955bedbc
15 changed files with 865 additions and 749 deletions

View File

View File

@ -0,0 +1,3 @@
from .api_testcase import APITestCase
__all__ = ["APITestCase"]

View File

@ -0,0 +1,103 @@
import json
import subprocess
import unittest
import requests
import sys
import time
from .podman import Podman
class APITestCase(unittest.TestCase):
PODMAN_URL = "http://localhost:8080"
podman = None # initialized podman configuration for tests
service = None # podman service instance
@classmethod
def setUpClass(cls):
super().setUpClass()
APITestCase.podman = Podman()
APITestCase.service = APITestCase.podman.open(
"system", "service", "tcp:localhost:8080", "--time=0"
)
# give the service some time to be ready...
time.sleep(2)
returncode = APITestCase.service.poll()
if returncode is not None:
raise subprocess.CalledProcessError(returncode, "podman system service")
r = requests.post(
APITestCase.uri("/images/pull?reference=quay.io%2Flibpod%2Falpine%3Alatest")
)
if r.status_code != 200:
raise subprocess.CalledProcessError(
r.status_code, f"podman images pull quay.io/libpod/alpine:latest {r.text}"
)
@classmethod
def tearDownClass(cls):
APITestCase.service.terminate()
stdout, stderr = APITestCase.service.communicate(timeout=0.5)
if stdout:
sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8"))
if stderr:
sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8"))
return super().tearDownClass()
def setUp(self):
super().setUp()
APITestCase.podman.run("run", "alpine", "/bin/ls", check=True)
def tearDown(self) -> None:
APITestCase.podman.run("pod", "rm", "--all", "--force", check=True)
APITestCase.podman.run("rm", "--all", "--force", check=True)
super().tearDown()
@property
def podman_url(self):
return "http://localhost:8080"
@staticmethod
def uri(path):
return APITestCase.PODMAN_URL + "/v2.0.0/libpod" + path
def resolve_container(self, path):
"""Find 'first' container and return 'Id' formatted into given URI path."""
try:
r = requests.get(self.uri("/containers/json?all=true"))
containers = r.json()
except Exception as e:
msg = f"Bad container response: {e}"
if r is not None:
msg += ": " + r.text
raise self.failureException(msg)
return path.format(containers[0]["Id"])
def assertContainerExists(self, member, msg=None): # pylint: disable=invalid-name
r = requests.get(self.uri(f"/containers/{member}/exists"))
if r.status_code == 404:
if msg is None:
msg = f"Container '{member}' does not exist."
self.failureException(msg)
def assertContainerNotExists(self, member, msg=None): # pylint: disable=invalid-name
r = requests.get(self.uri(f"/containers/{member}/exists"))
if r.status_code == 204:
if msg is None:
msg = f"Container '{member}' exists."
self.failureException(msg)
def assertId(self, content): # pylint: disable=invalid-name
objects = json.loads(content)
try:
if isinstance(objects, dict):
_ = objects["Id"]
else:
for item in objects:
_ = item["Id"]
except KeyError:
self.failureException("Failed in find 'Id' in return value.")

View File

@ -0,0 +1,136 @@
import configparser
import json
import os
import shutil
import subprocess
import sys
import tempfile
class Podman:
"""
Instances hold the configuration and setup for running podman commands
"""
def __init__(self):
"""Initialize a Podman instance with global options"""
binary = os.getenv("PODMAN", "bin/podman")
self.cmd = [binary, "--storage-driver=vfs"]
cgroupfs = os.getenv("CGROUP_MANAGER", "systemd")
self.cmd.append(f"--cgroup-manager={cgroupfs}")
if os.getenv("DEBUG"):
self.cmd.append("--log-level=debug")
self.cmd.append("--syslog=true")
self.anchor_directory = tempfile.mkdtemp(prefix="podman_restapi_")
self.cmd.append("--root=" + os.path.join(self.anchor_directory, "crio"))
self.cmd.append("--runroot=" + os.path.join(self.anchor_directory, "crio-run"))
os.environ["CONTAINERS_REGISTRIES_CONF"] = os.path.join(
self.anchor_directory, "registry.conf"
)
p = configparser.ConfigParser()
p.read_dict(
{
"registries.search": {"registries": "['quay.io']"},
"registries.insecure": {"registries": "[]"},
"registries.block": {"registries": "[]"},
}
)
with open(os.environ["CONTAINERS_REGISTRIES_CONF"], "w") as w:
p.write(w)
os.environ["CNI_CONFIG_PATH"] = os.path.join(self.anchor_directory, "cni", "net.d")
os.makedirs(os.environ["CNI_CONFIG_PATH"], exist_ok=True)
self.cmd.append("--cni-config-dir=" + os.environ["CNI_CONFIG_PATH"])
cni_cfg = os.path.join(os.environ["CNI_CONFIG_PATH"], "87-podman-bridge.conflist")
# json decoded and encoded to ensure legal json
buf = json.loads(
"""
{
"cniVersion": "0.3.0",
"name": "podman",
"plugins": [{
"type": "bridge",
"bridge": "cni0",
"isGateway": true,
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "10.88.0.0/16",
"routes": [{
"dst": "0.0.0.0/0"
}]
}
},
{
"type": "portmap",
"capabilities": {
"portMappings": true
}
}
]
}
"""
)
with open(cni_cfg, "w") as w:
json.dump(buf, w)
def open(self, command, *args, **kwargs):
"""Podman initialized instance to run a given command
:param self: Podman instance
:param command: podman sub-command to run
:param args: arguments and options for command
:param kwargs: See subprocess.Popen() for shell keyword
:return: subprocess.Popen() instance configured to run podman instance
"""
cmd = self.cmd.copy()
cmd.append(command)
cmd.extend(args)
shell = kwargs.get("shell", False)
return subprocess.Popen(
cmd,
shell=shell,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def run(self, command, *args, **kwargs):
"""Run given podman command
:param self: Podman instance
:param command: podman sub-command to run
:param args: arguments and options for command
:param kwargs: See subprocess.Popen() for shell and check keywords
:return: subprocess.Popen() instance configured to run podman instance
"""
cmd = self.cmd.copy()
cmd.append(command)
cmd.extend(args)
check = kwargs.get("check", False)
shell = kwargs.get("shell", False)
try:
return subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as e:
if e.stdout:
sys.stdout.write("\nRun Stdout:\n" + e.stdout.decode("utf-8"))
if e.stderr:
sys.stderr.write("\nRun Stderr:\n" + e.stderr.decode("utf-8"))
raise
def tear_down(self):
shutil.rmtree(self.anchor_directory, ignore_errors=True)

View File

@ -0,0 +1,192 @@
import random
import unittest
import requests
from dateutil.parser import parse
from .fixtures import APITestCase
class ContainerTestCase(APITestCase):
def test_list(self):
r = requests.get(self.uri("/containers/json"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
obj = r.json()
self.assertEqual(len(obj), 0)
def test_list_all(self):
r = requests.get(self.uri("/containers/json?all=true"))
self.assertEqual(r.status_code, 200, r.text)
self.assertId(r.content)
def test_inspect(self):
r = requests.get(self.uri(self.resolve_container("/containers/{}/json")))
self.assertEqual(r.status_code, 200, r.text)
self.assertId(r.content)
_ = parse(r.json()["Created"])
def test_stats(self):
r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertId(r.content)
def test_delete(self):
r = requests.delete(self.uri(self.resolve_container("/containers/{}")))
self.assertEqual(r.status_code, 204, r.text)
def test_stop(self):
r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(self.uri(self.resolve_container("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_start(self):
r = requests.post(self.uri(self.resolve_container("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_restart(self):
r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(self.uri(self.resolve_container("/containers/{}/restart")), timeout=5)
self.assertEqual(r.status_code, 204, r.text)
def test_resize(self):
r = requests.post(self.uri(self.resolve_container("/containers/{}/resize?h=43&w=80")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertEqual(r.text, "", r.text)
def test_attach(self):
self.skipTest("FIXME: Test timeouts")
r = requests.post(self.uri(self.resolve_container("/containers/{}/attach")), timeout=5)
self.assertIn(r.status_code, (101, 500), r.text)
def test_logs(self):
r = requests.get(self.uri(self.resolve_container("/containers/{}/logs?stdout=true")))
self.assertEqual(r.status_code, 200, r.text)
def test_commit(self):
r = requests.post(self.uri(self.resolve_container("/commit?container={}")))
self.assertEqual(r.status_code, 200, r.text)
self.assertId(r.content)
obj = r.json()
self.assertIsInstance(obj, dict)
def test_prune(self):
name = f"Container_{random.getrandbits(160):x}"
r = requests.post(
self.podman_url + f"/v1.40/containers/create?name={name}",
json={
"Cmd": ["cp", "/etc/motd", "/motd.size_test"],
"Image": "alpine:latest",
"NetworkDisabled": True,
},
)
self.assertEqual(r.status_code, 201, r.text)
create = r.json()
r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/start")
self.assertEqual(r.status_code, 204, r.text)
r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/wait")
self.assertEqual(r.status_code, 200, r.text)
wait = r.json()
self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"])
prune = requests.post(self.podman_url + "/v1.40/containers/prune")
self.assertEqual(prune.status_code, 200, prune.status_code)
prune_payload = prune.json()
self.assertGreater(prune_payload["SpaceReclaimed"], 0)
self.assertIn(create["Id"], prune_payload["ContainersDeleted"])
# Delete any orphaned containers
r = requests.get(self.podman_url + "/v1.40/containers/json?all=true")
self.assertEqual(r.status_code, 200, r.text)
for self.resolve_container in r.json():
requests.delete(
self.podman_url + f"/v1.40/containers/{self.resolve_container['Id']}?force=true"
)
# Image prune here tied to containers freeing up
prune = requests.post(self.podman_url + "/v1.40/images/prune")
self.assertEqual(prune.status_code, 200, prune.text)
prune_payload = prune.json()
self.assertGreater(prune_payload["SpaceReclaimed"], 0)
# FIXME need method to determine which image is going to be "pruned" to fix test
# TODO should handler be recursive when deleting images?
# self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"])
# FIXME (@vrothberg): I commented this line out during the `libimage` migration.
# It doesn't make sense to report anything to be deleted if the reclaimed space
# is zero. I think the test needs some rewrite.
# self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"])
def test_status(self):
r = requests.post(
self.podman_url + "/v1.40/containers/create?name=topcontainer",
json={"Cmd": ["top"], "Image": "alpine:latest"},
)
self.assertEqual(r.status_code, 201, r.text)
payload = r.json()
container_id = payload["Id"]
self.assertIsNotNone(container_id)
r = requests.get(
self.podman_url + "/v1.40/containers/json",
params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
)
self.assertEqual(r.status_code, 200, r.text)
payload = r.json()
self.assertEqual(payload[0]["Status"], "Created")
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start")
self.assertEqual(r.status_code, 204, r.text)
r = requests.get(
self.podman_url + "/v1.40/containers/json",
params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
)
self.assertEqual(r.status_code, 200, r.text)
payload = r.json()
self.assertTrue(str(payload[0]["Status"]).startswith("Up"))
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/pause")
self.assertEqual(r.status_code, 204, r.text)
r = requests.get(
self.podman_url + "/v1.40/containers/json",
params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
)
self.assertEqual(r.status_code, 200, r.text)
payload = r.json()
self.assertTrue(str(payload[0]["Status"]).startswith("Up"))
self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)"))
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/unpause")
self.assertEqual(r.status_code, 204, r.text)
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/stop")
self.assertEqual(r.status_code, 204, r.text)
r = requests.get(
self.podman_url + "/v1.40/containers/json",
params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
)
self.assertEqual(r.status_code, 200, r.text)
payload = r.json()
self.assertTrue(str(payload[0]["Status"]).startswith("Exited"))
r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}")
self.assertEqual(r.status_code, 204, r.text)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,165 @@
import json
import unittest
from multiprocessing import Process
import requests
from dateutil.parser import parse
from .fixtures import APITestCase
class ImageTestCase(APITestCase):
def test_list(self):
r = requests.get(self.podman_url + "/v1.40/images/json")
self.assertEqual(r.status_code, 200, r.text)
# See https://docs.docker.com/engine/api/v1.40/#operation/ImageList
required_keys = (
"Id",
"ParentId",
"RepoTags",
"RepoDigests",
"Created",
"Size",
"SharedSize",
"VirtualSize",
"Labels",
"Containers",
)
images = r.json()
self.assertIsInstance(images, list)
for item in images:
self.assertIsInstance(item, dict)
for k in required_keys:
self.assertIn(k, item)
def test_inspect(self):
r = requests.get(self.podman_url + "/v1.40/images/alpine/json")
self.assertEqual(r.status_code, 200, r.text)
# See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect
required_keys = (
"Id",
"Parent",
"Comment",
"Created",
"Container",
"DockerVersion",
"Author",
"Architecture",
"Os",
"Size",
"VirtualSize",
"GraphDriver",
"RootFS",
"Metadata",
)
image = r.json()
self.assertIsInstance(image, dict)
for item in required_keys:
self.assertIn(item, image)
_ = parse(image["Created"])
def test_delete(self):
r = requests.delete(self.podman_url + "/v1.40/images/alpine?force=true")
self.assertEqual(r.status_code, 200, r.text)
self.assertIsInstance(r.json(), list)
def test_pull(self):
r = requests.post(self.uri("/images/pull?reference=alpine"), timeout=15)
self.assertEqual(r.status_code, 200, r.status_code)
text = r.text
keys = {
"error": False,
"id": False,
"images": False,
"stream": False,
}
# Read and record stanza's from pull
for line in str.splitlines(text):
obj = json.loads(line)
key_list = list(obj.keys())
for k in key_list:
keys[k] = True
self.assertFalse(keys["error"], "Expected no errors")
self.assertTrue(keys["id"], "Expected to find id stanza")
self.assertTrue(keys["images"], "Expected to find images stanza")
self.assertTrue(keys["stream"], "Expected to find stream progress stanza's")
def test_search_compat(self):
url = self.podman_url + "/v1.40/images/search"
# Had issues with this test hanging when repositories not happy
def do_search1():
payload = {"term": "alpine"}
r = requests.get(url, params=payload, timeout=5)
self.assertEqual(r.status_code, 200, f"#1: {r.text}")
self.assertIsInstance(r.json(), list)
def do_search2():
payload = {"term": "alpine", "limit": 1}
r = requests.get(url, params=payload, timeout=5)
self.assertEqual(r.status_code, 200, f"#2: {r.text}")
results = r.json()
self.assertIsInstance(results, list)
self.assertEqual(len(results), 1)
def do_search3():
# FIXME: Research if quay.io supports is-official and which image is "official"
return
payload = {"term": "thanos", "filters": '{"is-official":["true"]}'}
r = requests.get(url, params=payload, timeout=5)
self.assertEqual(r.status_code, 200, f"#3: {r.text}")
results = r.json()
self.assertIsInstance(results, list)
# There should be only one official image
self.assertEqual(len(results), 1)
def do_search4():
headers = {"X-Registry-Auth": "null"}
payload = {"term": "alpine"}
r = requests.get(url, params=payload, headers=headers, timeout=5)
self.assertEqual(r.status_code, 200, f"#4: {r.text}")
def do_search5():
headers = {"X-Registry-Auth": "invalid value"}
payload = {"term": "alpine"}
r = requests.get(url, params=payload, headers=headers, timeout=5)
self.assertEqual(r.status_code, 400, f"#5: {r.text}")
i = 1
for fn in [do_search1, do_search2, do_search3, do_search4, do_search5]:
with self.subTest(i=i):
search = Process(target=fn)
search.start()
search.join(timeout=10)
self.assertFalse(search.is_alive(), f"#{i} /images/search took too long")
# search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5]
# for search_method in search_methods:
# search = Process(target=search_method)
# search.start()
# search.join(timeout=10)
# self.assertFalse(search.is_alive(), "/images/search took too long")
def test_history(self):
r = requests.get(self.podman_url + "/v1.40/images/alpine/history")
self.assertEqual(r.status_code, 200, r.text)
# See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory
required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment")
changes = r.json()
self.assertIsInstance(changes, list)
for change in changes:
self.assertIsInstance(change, dict)
for k in required_keys:
self.assertIn(k, change)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,14 @@
import unittest
import requests
from .fixtures import APITestCase
class ManifestTestCase(APITestCase):
def test_manifest_409(self):
r = requests.post(self.uri("/manifests/create"), params={"name": "ThisIsAnInvalidImage"})
self.assertEqual(r.status_code, 400, r.text)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,155 @@
import random
import unittest
import requests
from .fixtures import APITestCase
class NetworkTestCase(APITestCase):
# TODO Need to support Docker-py order of network/container creates
def test_connect(self):
"""Create network and container then connect to network"""
net_default = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"}
)
self.assertEqual(net_default.status_code, 201, net_default.text)
create = requests.post(
self.podman_url + "/v1.40/containers/create?name=postCreateConnect",
json={
"Cmd": ["top"],
"Image": "alpine:latest",
"NetworkDisabled": False,
# FIXME adding these 2 lines cause: (This is sampled from docker-py)
# "network already exists","message":"container
# 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI
# network \"TestDefaultNetwork\": network already exists"
# "HostConfig": {"NetworkMode": "TestDefaultNetwork"},
# "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}},
# FIXME These two lines cause:
# CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container
# 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not
# found"
# "HostConfig": {"NetworkMode": "TestNetwork"},
# "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}},
# FIXME no networking defined cause: (note this error is from the container inspect below)
# "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [
# TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error"
},
)
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
payload = create.json()
start = requests.post(self.podman_url + f"/v1.40/containers/{payload['Id']}/start")
self.assertEqual(start.status_code, 204, start.text)
connect = requests.post(
self.podman_url + "/v1.40/networks/TestDefaultNetwork/connect",
json={"Container": payload["Id"]},
)
self.assertEqual(connect.status_code, 200, connect.text)
self.assertEqual(connect.text, "OK\n")
inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
self.assertEqual(inspect.status_code, 200, inspect.text)
payload = inspect.json()
self.assertFalse(payload["Config"].get("NetworkDisabled", False))
self.assertEqual(
"TestDefaultNetwork",
payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"],
)
# TODO restore this to test, when joining multiple networks possible
# self.assertEqual(
# "TestNetwork",
# payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
# )
# TODO Need to support network aliases
# self.assertIn(
# "test_post_create",
# payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"],
# )
def test_create(self):
"""Create network and connect container during create"""
net = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": "TestNetwork"}
)
self.assertEqual(net.status_code, 201, net.text)
create = requests.post(
self.podman_url + "/v1.40/containers/create?name=postCreate",
json={
"Cmd": ["date"],
"Image": "alpine:latest",
"NetworkDisabled": False,
"HostConfig": {"NetworkMode": "TestNetwork"},
},
)
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
payload = create.json()
inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
self.assertEqual(inspect.status_code, 200, inspect.text)
payload = inspect.json()
self.assertFalse(payload["Config"].get("NetworkDisabled", False))
self.assertEqual(
"TestNetwork",
payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
)
def test_crud(self):
name = f"Network_{random.getrandbits(160):x}"
# Cannot test for 0 existing networks because default "podman" network always exists
create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name})
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
net = create.json()
self.assertIsInstance(net, dict)
self.assertNotEqual(net["Id"], name)
ident = net["Id"]
ls = requests.get(self.podman_url + "/v1.40/networks")
self.assertEqual(ls.status_code, 200, ls.text)
networks = ls.json()
self.assertIsInstance(networks, list)
found = False
for net in networks:
if net["Name"] == name:
found = True
break
self.assertTrue(found, f"Network '{name}' not found")
inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 200, inspect.text)
self.assertIsInstance(inspect.json(), dict)
inspect = requests.delete(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 204, inspect.text)
inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 404, inspect.text)
# network prune
prune_name = f"Network_{random.getrandbits(160):x}"
prune_create = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": prune_name}
)
self.assertEqual(create.status_code, 201, prune_create.text)
prune = requests.post(self.podman_url + "/v1.40/networks/prune")
self.assertEqual(prune.status_code, 200, prune.text)
self.assertTrue(prune_name in prune.json()["NetworksDeleted"])
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,65 @@
import random
import unittest
import requests
from .fixtures import APITestCase
class TestApi(APITestCase):
def test_pod_start_conflict(self):
"""Verify issue #8865"""
pod_name = list()
pod_name.append(f"Pod_{random.getrandbits(160):x}")
pod_name.append(f"Pod_{random.getrandbits(160):x}")
r = requests.post(
self.uri("/pods/create"),
json={
"name": pod_name[0],
"no_infra": False,
"portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}],
},
)
self.assertEqual(r.status_code, 201, r.text)
r = requests.post(
self.uri("/containers/create"),
json={
"pod": pod_name[0],
"image": "quay.io/libpod/alpine:latest",
"command": ["top"],
},
)
self.assertEqual(r.status_code, 201, r.text)
r = requests.post(
self.uri("/pods/create"),
json={
"name": pod_name[1],
"no_infra": False,
"portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}],
},
)
self.assertEqual(r.status_code, 201, r.text)
r = requests.post(
self.uri("/containers/create"),
json={
"pod": pod_name[1],
"image": "quay.io/libpod/alpine:latest",
"command": ["top"],
},
)
self.assertEqual(r.status_code, 201, r.text)
r = requests.post(self.uri(f"/pods/{pod_name[0]}/start"))
self.assertEqual(r.status_code, 200, r.text)
r = requests.post(self.uri(f"/pods/{pod_name[1]}/start"))
self.assertEqual(r.status_code, 409, r.text)
start = r.json()
self.assertGreater(len(start["Errs"]), 0, r.text)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,88 @@
import json
import unittest
import requests
from .fixtures import APITestCase
class SystemTestCase(APITestCase):
def test_info(self):
r = requests.get(self.uri("/info"))
self.assertEqual(r.status_code, 200, r.text)
self.assertIsNotNone(r.content)
_ = r.json()
r = requests.get(self.podman_url + "/v1.40/info")
self.assertEqual(r.status_code, 200, r.text)
self.assertIsNotNone(r.content)
_ = r.json()
def test_events(self):
r = requests.get(self.uri("/events?stream=false"))
self.assertEqual(r.status_code, 200, r.text)
self.assertIsNotNone(r.content)
report = r.text.splitlines()
self.assertGreater(len(report), 0, "No events found!")
for line in report:
obj = json.loads(line)
# Actor.ID is uppercase for compatibility
self.assertIn("ID", obj["Actor"])
def test_ping(self):
required_headers = (
"API-Version",
"Builder-Version",
"Docker-Experimental",
"Cache-Control",
"Pragma",
"Pragma",
)
def check_headers(req):
for k in required_headers:
self.assertIn(k, req.headers)
r = requests.get(self.podman_url + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
self.assertEqual(r.text, "OK")
check_headers(r)
r = requests.head(self.podman_url + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
self.assertEqual(r.text, "")
check_headers(r)
r = requests.get(self.uri("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
self.assertEqual(r.text, "OK")
check_headers(r)
r = requests.head(self.uri("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
self.assertEqual(r.text, "")
check_headers(r)
def test_version(self):
r = requests.get(self.podman_url + "/v1.40/version")
self.assertEqual(r.status_code, 200, r.text)
r = requests.get(self.uri("/version"))
self.assertEqual(r.status_code, 200, r.text)
def test_df(self):
r = requests.get(self.podman_url + "/v1.40/system/df")
self.assertEqual(r.status_code, 200, r.text)
obj = r.json()
self.assertIn("Images", obj)
self.assertIn("Containers", obj)
self.assertIn("Volumes", obj)
self.assertIn("BuildCache", obj)
r = requests.get(self.uri("/system/df"))
self.assertEqual(r.status_code, 200, r.text)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,75 @@
import os
import random
import unittest
import requests
from .fixtures import APITestCase
class VolumeTestCase(APITestCase):
def test_volume(self):
name = f"Volume_{random.getrandbits(160):x}"
ls = requests.get(self.podman_url + "/v1.40/volumes")
self.assertEqual(ls.status_code, 200, ls.text)
# See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList
required_keys = (
"Volumes",
"Warnings",
)
volumes = ls.json()
self.assertIsInstance(volumes, dict)
for key in required_keys:
self.assertIn(key, volumes)
create = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name})
self.assertEqual(create.status_code, 201, create.text)
# See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate
# and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect
required_keys = (
"Name",
"Driver",
"Mountpoint",
"Labels",
"Scope",
"Options",
)
volume = create.json()
self.assertIsInstance(volume, dict)
for k in required_keys:
self.assertIn(k, volume)
self.assertEqual(volume["Name"], name)
inspect = requests.get(self.podman_url + f"/v1.40/volumes/{name}")
self.assertEqual(inspect.status_code, 200, inspect.text)
volume = inspect.json()
self.assertIsInstance(volume, dict)
for k in required_keys:
self.assertIn(k, volume)
rm = requests.delete(self.podman_url + f"/v1.40/volumes/{name}")
self.assertEqual(rm.status_code, 204, rm.text)
# recreate volume with data and then prune it
r = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name})
self.assertEqual(create.status_code, 201, create.text)
create = r.json()
with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file:
file.writelines(["This is a test\n", "This is a good test\n"])
prune = requests.post(self.podman_url + "/v1.40/volumes/prune")
self.assertEqual(prune.status_code, 200, prune.text)
payload = prune.json()
self.assertIn(name, payload["VolumesDeleted"])
self.assertGreater(payload["SpaceReclaimed"], 0)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,238 @@
import json
import os
import shlex
import signal
import string
import subprocess
import sys
import time
import unittest
from collections.abc import Iterable
from multiprocessing import Process
import requests
from dateutil.parser import parse
PODMAN_URL = "http://localhost:8080"
def _url(path):
return PODMAN_URL + "/v1.0.0/libpod" + path
def podman():
binary = os.getenv("PODMAN_BINARY")
if binary is None:
binary = "bin/podman"
return binary
def ctnr(path):
r = requests.get(_url("/containers/json?all=true"))
try:
ctnrs = json.loads(r.text)
except Exception as e:
sys.stderr.write("Bad container response: {}/{}".format(r.text, e))
raise e
return path.format(ctnrs[0]["Id"])
class TestApi(unittest.TestCase):
podman = None
def setUp(self):
super().setUp()
if TestApi.podman.poll() is not None:
sys.stderr.write("podman service returned {}", TestApi.podman.returncode)
sys.exit(2)
requests.get(_url("/images/create?fromSrc=quay.io%2Flibpod%2Falpine%3Alatest"))
# calling out to podman is easier than the API for running a container
subprocess.run(
[podman(), "run", "alpine", "/bin/ls"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
@classmethod
def setUpClass(cls):
super().setUpClass()
TestApi.podman = subprocess.Popen(
[
podman(),
"system",
"service",
"tcp:localhost:8080",
"--log-level=debug",
"--time=0",
],
shell=False,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(2)
@classmethod
def tearDownClass(cls):
TestApi.podman.terminate()
stdout, stderr = TestApi.podman.communicate(timeout=0.5)
if stdout:
print("\nService Stdout:\n" + stdout.decode("utf-8"))
if stderr:
print("\nService Stderr:\n" + stderr.decode("utf-8"))
if TestApi.podman.returncode > 0:
sys.stderr.write("podman exited with error code {}\n".format(TestApi.podman.returncode))
sys.exit(2)
return super().tearDownClass()
def test_info(self):
r = requests.get(_url("/info"))
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(r.content)
_ = json.loads(r.text)
def test_events(self):
r = requests.get(_url("/events?stream=false"))
self.assertEqual(r.status_code, 200, r.text)
self.assertIsNotNone(r.content)
for line in r.text.splitlines():
obj = json.loads(line)
# Actor.ID is uppercase for compatibility
_ = obj["Actor"]["ID"]
def test_containers(self):
r = requests.get(_url("/containers/json"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
obj = json.loads(r.text)
self.assertEqual(len(obj), 0)
def test_containers_all(self):
r = requests.get(_url("/containers/json?all=true"))
self.assertEqual(r.status_code, 200, r.text)
self.validateObjectFields(r.text)
def test_inspect_container(self):
r = requests.get(_url(ctnr("/containers/{}/json")))
self.assertEqual(r.status_code, 200, r.text)
obj = self.validateObjectFields(r.content)
_ = parse(obj["Created"])
def test_stats(self):
r = requests.get(_url(ctnr("/containers/{}/stats?stream=false")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.validateObjectFields(r.text)
def test_delete_containers(self):
r = requests.delete(_url(ctnr("/containers/{}")))
self.assertEqual(r.status_code, 204, r.text)
def test_stop_containers(self):
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_start_containers(self):
r = requests.post(_url(ctnr("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_restart_containers(self):
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5)
self.assertEqual(r.status_code, 204, r.text)
def test_resize(self):
r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertIsNone(r.text)
def test_attach_containers(self):
r = requests.post(_url(ctnr("/containers/{}/attach")))
self.assertIn(r.status_code, (101, 409), r.text)
def test_logs_containers(self):
r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true")))
self.assertEqual(r.status_code, 200, r.text)
def test_post_create(self):
self.skipTest("TODO: create request body")
r = requests.post(_url("/containers/create?args=True"))
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
def test_commit(self):
r = requests.post(_url(ctnr("/commit?container={}")))
self.assertEqual(r.status_code, 200, r.text)
self.validateObjectFields(r.text)
def test_images(self):
r = requests.get(_url("/images/json"))
self.assertEqual(r.status_code, 200, r.text)
self.validateObjectFields(r.content)
def test_inspect_image(self):
r = requests.get(_url("/images/alpine/json"))
self.assertEqual(r.status_code, 200, r.text)
obj = self.validateObjectFields(r.content)
_ = parse(obj["Created"])
def test_delete_image(self):
r = requests.delete(_url("/images/alpine?force=true"))
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
def test_pull(self):
r = requests.post(_url("/images/pull?reference=alpine"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
def test_search(self):
# Had issues with this test hanging when repositories not happy
def do_search():
r = requests.get(_url("/images/search?term=alpine"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
search = Process(target=do_search)
search.start()
search.join(timeout=10)
self.assertFalse(search.is_alive(), "/images/search took too long")
def test_ping(self):
r = requests.get(PODMAN_URL + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
r = requests.head(PODMAN_URL + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
r = requests.get(_url("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
r = requests.get(_url("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
def validateObjectFields(self, buffer):
objs = json.loads(buffer)
if not isinstance(objs, dict):
for o in objs:
_ = o["Id"]
else:
_ = objs["Id"]
return objs
if __name__ == "__main__":
unittest.main()