Update python directories to better support setup.py

Signed-off-by: Jhon Honce <jhonce@redhat.com>
This commit is contained in:
Jhon Honce
2018-07-12 19:26:14 -07:00
parent 44b523c946
commit 74ccd9ce5f
54 changed files with 445 additions and 154 deletions

View File

@ -0,0 +1 @@
v0.1.0, 2018-05-11 -- Initial release.

View File

@ -0,0 +1,13 @@
Copyright 2018 Red Hat, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,2 @@
prune test/
include README.md

View File

@ -0,0 +1,21 @@
PYTHON ?= /usr/bin/python3
.PHONY: python-podman
python-podman:
$(PYTHON) setup.py bdist
.PHONY: integration
integration:
test/test_runner.sh
.PHONY: install
install:
$(PYTHON) setup.py install --user
.PHONY: clean
clean:
$(PYTHON) setup.py clean --all
pip3 uninstall podman ||:
rm -rf podman.egg-info dist
find . -depth -name __pycache__ -exec rm -rf {} \;
find . -depth -name \*.pyc -exec rm -f {} \;

View File

@ -0,0 +1,43 @@
# podman - pythonic library for working with varlink interface to Podman
## Status: Active Development
See [libpod](https://github.com/projectatomic/libpod)
## Releases
To build the podman egg:
```sh
cd ~/libpod/contrib/python
python3 setup.py clean -a && python3 setup.py bdist
```
## Code snippets/examples:
### Show images in storage
```python
import podman
with podman.Client() as client:
list(map(print, client.images.list()))
```
### Show containers created since midnight
```python
from datetime import datetime, time, timezone
import podman
midnight = datetime.combine(datetime.today(), time.min, tzinfo=timezone.utc)
with podman.Client() as client:
for c in client.containers.list():
created_at = podman.datetime_parse(c.createdat)
if created_at > midnight:
print('Container {}: image: {} created at: {}'.format(
c.id[:12], c.image[:32], podman.datetime_format(created_at)))
```

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""Example: Run top on Alpine container."""
import podman
print('{}\n'.format(__doc__))
with podman.Client() as client:
id = client.images.pull('alpine:latest')
img = client.images.get(id)
cntr = img.create(detach=True, tty=True, command=['/usr/bin/top'])
cntr.attach(eot=4)
try:
cntr.start()
print()
except (BrokenPipeError, KeyboardInterrupt):
print('\nContainer disconnected.')

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
"""Example: Show containers grouped by image id."""
from itertools import groupby
import podman
print('{}\n'.format(__doc__))
with podman.Client() as client:
ctnrs = sorted(client.containers.list(), key=lambda k: k.imageid)
for key, grp in groupby(ctnrs, key=lambda k: k.imageid):
print('Image: {}'.format(key))
for c in grp:
print(' : container: {} created at: {}'.format(
c.id[:12], podman.datetime_format(c.createdat)))

View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
"""Example: Show all images on system."""
import podman
print('{}\n'.format(__doc__))
with podman.Client() as client:
for img in client.images.list():
print(img)

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
"""Example: Pull Fedora and inspect image and container."""
import podman
print('{}\n'.format(__doc__))
with podman.Client() as client:
id = client.images.pull('registry.fedoraproject.org/fedora:28')
img = client.images.get(id)
print(img.inspect())
cntr = img.create()
print(cntr.inspect())
cntr.remove()

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
"""Example: Show all containers created since midnight."""
from datetime import datetime, time, timezone
import podman
print('{}\n'.format(__doc__))
midnight = datetime.combine(datetime.today(), time.min, tzinfo=timezone.utc)
with podman.Client() as client:
for c in client.containers.list():
created_at = podman.datetime_parse(c.createdat)
if created_at > midnight:
print('{}: image: {} createdAt: {}'.format(
c.id[:12], c.image[:32], podman.datetime_format(created_at)))

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""Example: Create new image from container."""
import sys
import podman
def print_history(details):
"""Format history data from an image, in a table."""
for i, r in enumerate(details):
print(
'{}: {} {} {}'.format(i, r.id[:12],
podman.datetime_format(r.created), r.tags),
sep='\n')
print("-" * 25)
print('{}\n'.format(__doc__))
with podman.Client() as client:
ctnr = next(
(c for c in client.containers.list() if 'alpine' in c['image']), None)
if ctnr:
print_history(client.images.get(ctnr.imageid).history())
# Make changes as we save the container to a new image
id = ctnr.commit('alpine-ash', changes=['CMD=/bin/ash'])
print_history(client.images.get(id).history())
else:
print('Unable to find "alpine" container.', file=sys.stderr)

View File

@ -0,0 +1,43 @@
#!/bin/bash
export PYTHONPATH=..
function examples {
for file in $@; do
python3 -c "import ast; f=open('"${file}"'); t=ast.parse(f.read()); print(ast.get_docstring(t) + ' -- "${file}"')"
done
}
while getopts "lh" arg; do
case $arg in
l ) examples $(ls eg_*.py); exit 0 ;;
h ) echo 1>&2 $0 [-l] [-h] filename ; exit 2 ;;
esac
done
shift $((OPTIND-1))
# podman needs to play some games with resources
if [[ $(id -u) != 0 ]]; then
echo 1>&2 $0 must be run as root.
exit 2
fi
if ! systemctl --quiet is-active io.projectatomic.podman.socket; then
echo 1>&2 'podman is not running. systemctl enable --now io.projectatomic.podman.socket'
exit 1
fi
function cleanup {
podman rm $1 >/dev/null 2>&1
}
# Setup storage with an image and container
podman pull alpine:latest >/tmp/podman.output 2>&1
CTNR=$(podman create alpine)
trap "cleanup $CTNR" EXIT
if [[ -f $1 ]]; then
python3 $1
else
python3 $1.py
fi

View File

@ -44,11 +44,11 @@ class BaseClient(object):
raise ValueError('path is required for uri,'
' expected format "unix://path_to_socket"')
if kwargs.get('remote_uri') or kwargs.get('identity_file'):
if kwargs.get('remote_uri'):
# Remote access requires the full tuple of information
if kwargs.get('remote_uri') is None:
raise ValueError(
'remote is required,'
'remote_uri is required,'
' expected format "ssh://user@hostname/path_to_socket".')
remote = urlparse(kwargs['remote_uri'])
if remote.username is None:
@ -64,20 +64,16 @@ class BaseClient(object):
'hostname is required for remote_uri,'
' expected format "ssh://user@hostname/path_to_socket".')
if kwargs.get('identity_file') is None:
raise ValueError('identity_file is required.')
if not os.path.isfile(kwargs['identity_file']):
raise FileNotFoundError(
errno.ENOENT,
os.strerror(errno.ENOENT),
kwargs['identity_file'],
)
return RemoteClient(
Context(uri, interface, local_path, remote.path,
remote.username, remote.hostname,
kwargs['identity_file']))
Context(
uri,
interface,
local_path,
remote.path,
remote.username,
remote.hostname,
kwargs.get('identity_file'),
))
else:
return LocalClient(
Context(uri, interface, None, None, None, None, None))

View File

@ -97,26 +97,27 @@ class Tunnel(object):
def bore(self, id):
"""Create SSH tunnel from given context."""
ssh_opts = '-nNT'
cmd = ['ssh']
ssh_opts = '-fNT'
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
ssh_opts += 'v'
else:
ssh_opts += 'q'
cmd.append(ssh_opts)
cmd.extend(('-L', '{}:{}'.format(self.context.local_socket,
self.context.remote_socket)))
if self.context.identity_file:
cmd.extend(('-i', self.context.identity_file))
cmd.append('ssh://{}@{}'.format(self.context.username,
self.context.hostname))
cmd = [
'ssh',
ssh_opts,
'-L',
'{}:{}'.format(self.context.local_socket,
self.context.remote_socket),
'-i',
self.context.identity_file,
'ssh://{}@{}'.format(self.context.username, self.context.hostname),
]
logging.debug('Tunnel cmd "{}"'.format(' '.join(cmd)))
self._tunnel = subprocess.Popen(cmd, close_fds=True)
for i in range(10):
for i in range(300):
# TODO: Make timeout configurable
if os.path.exists(self.context.local_socket):
break

View File

@ -0,0 +1,3 @@
varlink>=26.1.0
setuptools>=39.2.0
python-dateutil>=2.7.3

View File

@ -0,0 +1,38 @@
import os
from setuptools import find_packages, setup
root = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(root, 'README.md')) as me:
readme = me.read()
with open(os.path.join(root, 'requirements.txt')) as r:
requirements = r.read().splitlines()
setup(
name='podman',
version=os.environ.get('PODMAN_VERSION', '0.0.0'),
description='A client for communicating with a Podman server',
long_description=readme,
author='Jhon Honce',
author_email='jhonce@redhat.com',
url='http://github.com/projectatomic/libpod',
license='Apache Software License',
python_requires='>=3',
include_package_data=True,
install_requires=requirements,
packages=find_packages(exclude=['test']),
zip_safe=True,
keywords='varlink libpod podman',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Topic :: Software Development',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.6',
])
# Not supported
# long_description_content_type='text/markdown',

View File

View File

@ -0,0 +1,107 @@
import contextlib
import functools
import itertools
import os
import subprocess
import time
import unittest
from varlink import VarlinkError
MethodNotImplemented = 'org.varlink.service.MethodNotImplemented'
class PodmanTestCase(unittest.TestCase):
"""Hide the sausage making of initializing storage."""
@classmethod
def setUpClass(cls):
if hasattr(PodmanTestCase, 'alpine_process'):
PodmanTestCase.tearDownClass()
def run_cmd(*args):
cmd = list(itertools.chain(*args))
try:
pid = subprocess.Popen(
cmd,
close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = pid.communicate()
except OSError as e:
print('{}: {}({})'.format(cmd, e.strerror, e.returncode))
except ValueError as e:
print('{}: {}'.format(cmd, e.message))
raise
else:
return out.strip()
tmpdir = os.environ.get('TMPDIR', '/tmp')
podman_args = [
'--storage-driver=vfs',
'--root={}/crio'.format(tmpdir),
'--runroot={}/crio-run'.format(tmpdir),
'--cni-config-dir={}/cni/net.d'.format(tmpdir),
]
run_podman = functools.partial(run_cmd, ['podman'], podman_args)
id = run_podman(['pull', 'alpine'])
setattr(PodmanTestCase, 'alpine_id', id)
run_podman(['pull', 'busybox'])
run_podman(['images'])
run_cmd(['rm', '-f', '{}/alpine_gold.tar'.format(tmpdir)])
run_podman([
'save', '--output', '{}/alpine_gold.tar'.format(tmpdir), 'alpine'
])
PodmanTestCase.alpine_log = open(
os.path.join('/tmp/', 'alpine.log'), 'w')
cmd = ['podman']
cmd.extend(podman_args)
# cmd.extend(['run', '-d', 'alpine', 'sleep', '500'])
cmd.extend(['run', '-dt', 'alpine', '/bin/sh'])
PodmanTestCase.alpine_process = subprocess.Popen(
cmd,
stdout=PodmanTestCase.alpine_log,
stderr=subprocess.STDOUT,
)
PodmanTestCase.busybox_log = open(
os.path.join('/tmp/', 'busybox.log'), 'w')
cmd = ['podman']
cmd.extend(podman_args)
cmd.extend(['create', 'busybox'])
PodmanTestCase.busybox_process = subprocess.Popen(
cmd,
stdout=PodmanTestCase.busybox_log,
stderr=subprocess.STDOUT,
)
# give podman time to start ctnr
time.sleep(2)
# Close our handle of file
PodmanTestCase.alpine_log.close()
PodmanTestCase.busybox_log.close()
@classmethod
def tearDownClass(cls):
try:
PodmanTestCase.alpine_process.kill()
assert 0 == PodmanTestCase.alpine_process.wait(500)
delattr(PodmanTestCase, 'alpine_process')
PodmanTestCase.busybox_process.kill()
assert 0 == PodmanTestCase.busybox_process.wait(500)
except Exception as e:
print('Exception: {}'.format(e))
raise
@contextlib.contextmanager
def assertRaisesNotImplemented(self):
with self.assertRaisesRegex(VarlinkError, MethodNotImplemented):
yield

View File

@ -0,0 +1,35 @@
from __future__ import absolute_import
import unittest
from unittest.mock import patch
import podman
from podman.client import BaseClient, Client, LocalClient, RemoteClient
class TestClient(unittest.TestCase):
def setUp(self):
pass
@patch('podman.libs.system.System.ping', return_value=True)
def test_local(self, mock_ping):
p = Client(
uri='unix:/run/podman',
interface='io.projectatomic.podman',
)
self.assertIsInstance(p._client, LocalClient)
self.assertIsInstance(p._client, BaseClient)
mock_ping.assert_called_once_with()
@patch('podman.libs.system.System.ping', return_value=True)
def test_remote(self, mock_ping):
p = Client(
uri='unix:/run/podman',
interface='io.projectatomic.podman',
remote_uri='ssh://user@hostname/run/podmain/podman',
identity_file='~/.ssh/id_rsa')
self.assertIsInstance(p._client, BaseClient)
mock_ping.assert_called_once_with()

View File

@ -0,0 +1,234 @@
import os
import signal
import unittest
from test.podman_testcase import PodmanTestCase
import podman
class TestContainers(PodmanTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
def setUp(self):
self.tmpdir = os.environ['TMPDIR']
self.host = os.environ['PODMAN_HOST']
self.pclient = podman.Client(self.host)
self.loadCache()
def tearDown(self):
pass
def loadCache(self):
self.containers = list(self.pclient.containers.list())
self.alpine_ctnr = next(
iter([c for c in self.containers if 'alpine' in c['image']] or []),
None)
if self.alpine_ctnr and self.alpine_ctnr.status != 'running':
self.alpine_ctnr.start()
def test_list(self):
self.assertGreaterEqual(len(self.containers), 2)
self.assertIsNotNone(self.alpine_ctnr)
self.assertIn('alpine', self.alpine_ctnr.image)
def test_delete_stopped(self):
before = len(self.containers)
self.alpine_ctnr.stop()
target = self.alpine_ctnr.id
actual = self.pclient.containers.delete_stopped()
self.assertIn(target, actual)
self.loadCache()
after = len(self.containers)
self.assertLess(after, before)
TestContainers.setUpClass()
def test_get(self):
actual = self.pclient.containers.get(self.alpine_ctnr.id)
for k in ['id', 'status', 'ports']:
self.assertEqual(actual[k], self.alpine_ctnr[k])
with self.assertRaises(podman.ContainerNotFound):
self.pclient.containers.get("bozo")
def test_attach(self):
# StringIO does not support fileno() so we had to go old school
input = os.path.join(self.tmpdir, 'test_attach.stdin')
output = os.path.join(self.tmpdir, 'test_attach.stdout')
with open(input, 'w+') as mock_in, open(output, 'w+') as mock_out:
# double quote is indeed in the expected place
mock_in.write('echo H"ello, World"; exit\n')
mock_in.seek(0, 0)
ctnr = self.pclient.images.get(self.alpine_ctnr.image).container(
detach=True, tty=True)
ctnr.attach(stdin=mock_in.fileno(), stdout=mock_out.fileno())
ctnr.start()
mock_out.flush()
mock_out.seek(0, 0)
output = mock_out.read()
self.assertIn('Hello', output)
ctnr.remove(force=True)
def test_processes(self):
actual = list(self.alpine_ctnr.processes())
self.assertGreaterEqual(len(actual), 2)
def test_start_stop_wait(self):
ctnr = self.alpine_ctnr.stop()
self.assertFalse(ctnr['running'])
ctnr.start()
self.assertTrue(ctnr.running)
ctnr.stop()
self.assertFalse(ctnr['containerrunning'])
actual = ctnr.wait()
self.assertGreaterEqual(actual, 0)
def test_changes(self):
actual = self.alpine_ctnr.changes()
self.assertListEqual(
sorted(['changed', 'added', 'deleted']), sorted(
list(actual.keys())))
# TODO: brittle, depends on knowing history of ctnr
self.assertGreaterEqual(len(actual['changed']), 2)
self.assertGreaterEqual(len(actual['added']), 3)
self.assertEqual(len(actual['deleted']), 0)
def test_kill(self):
self.assertTrue(self.alpine_ctnr.running)
ctnr = self.alpine_ctnr.kill(signal.SIGKILL)
self.assertFalse(ctnr.running)
def test_inspect(self):
actual = self.alpine_ctnr.inspect()
self.assertEqual(actual.id, self.alpine_ctnr.id)
# TODO: Datetime values from inspect missing offset in CI instance
# self.assertEqual(
# datetime_parse(actual.created),
# datetime_parse(self.alpine_ctnr.createdat))
def test_export(self):
target = os.path.join(self.tmpdir, 'alpine_export_ctnr.tar')
actual = self.alpine_ctnr.export(target)
self.assertEqual(actual, target)
self.assertTrue(os.path.isfile(target))
self.assertGreater(os.path.getsize(target), 0)
def test_commit(self):
# TODO: Test for STOPSIGNAL when supported by OCI
# TODO: Test for message when supported by OCI
details = self.pclient.images.get(self.alpine_ctnr.image).inspect()
changes = ['ENV=' + i for i in details.containerconfig['env']]
changes.append('CMD=/usr/bin/zsh')
changes.append('ENTRYPOINT=/bin/sh date')
changes.append('ENV=TEST=test_containers.TestContainers.test_commit')
changes.append('EXPOSE=80')
changes.append('EXPOSE=8888')
changes.append('LABEL=unittest=test_commit')
changes.append('USER=bozo:circus')
changes.append('VOLUME=/data')
changes.append('WORKDIR=/data/application')
id = self.alpine_ctnr.commit(
'alpine3', author='Bozo the clown', changes=changes, pause=True)
img = self.pclient.images.get(id)
self.assertIsNotNone(img)
details = img.inspect()
self.assertEqual(details.author, 'Bozo the clown')
self.assertListEqual(['/usr/bin/zsh'], details.containerconfig['cmd'])
self.assertListEqual(['/bin/sh date'],
details.containerconfig['entrypoint'])
self.assertIn('TEST=test_containers.TestContainers.test_commit',
details.containerconfig['env'])
self.assertTrue(
[e for e in details.containerconfig['env'] if 'PATH=' in e])
self.assertDictEqual({
'80': {},
'8888': {},
}, details.containerconfig['exposedports'])
self.assertDictEqual({'unittest': 'test_commit'}, details.labels)
self.assertEqual('bozo:circus', details.containerconfig['user'])
self.assertEqual({'/data': {}}, details.containerconfig['volumes'])
self.assertEqual('/data/application',
details.containerconfig['workingdir'])
def test_remove(self):
before = len(self.containers)
with self.assertRaises(podman.ErrorOccurred):
self.alpine_ctnr.remove()
self.assertEqual(
self.alpine_ctnr.id, self.alpine_ctnr.remove(force=True))
self.loadCache()
after = len(self.containers)
self.assertLess(after, before)
TestContainers.setUpClass()
def test_restart(self):
self.assertTrue(self.alpine_ctnr.running)
before = self.alpine_ctnr.runningfor
ctnr = self.alpine_ctnr.restart()
self.assertTrue(ctnr.running)
after = self.alpine_ctnr.runningfor
# TODO: restore check when restart zeros counter
# self.assertLess(after, before)
def test_rename(self):
with self.assertRaisesNotImplemented():
self.alpine_ctnr.rename('new_alpine')
def test_resize_tty(self):
with self.assertRaisesNotImplemented():
self.alpine_ctnr.resize_tty(132, 43)
def test_pause_unpause(self):
self.assertTrue(self.alpine_ctnr.running)
ctnr = self.alpine_ctnr.pause()
self.assertEqual(ctnr.status, 'paused')
ctnr = self.alpine_ctnr.unpause()
self.assertTrue(ctnr.running)
self.assertTrue(ctnr.status, 'running')
def test_stats(self):
self.assertTrue(self.alpine_ctnr.running)
actual = self.alpine_ctnr.stats()
self.assertEqual(self.alpine_ctnr.id, actual.id)
self.assertEqual(self.alpine_ctnr.names, actual.name)
def test_logs(self):
self.assertTrue(self.alpine_ctnr.running)
actual = list(self.alpine_ctnr.logs())
self.assertIsNotNone(actual)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,172 @@
import itertools
import os
import unittest
from collections import Counter
from datetime import datetime, timezone
from test.podman_testcase import PodmanTestCase
import podman
class TestImages(PodmanTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
def setUp(self):
self.tmpdir = os.environ['TMPDIR']
self.host = os.environ['PODMAN_HOST']
self.pclient = podman.Client(self.host)
self.images = self.loadCache()
def tearDown(self):
pass
def loadCache(self):
with podman.Client(self.host) as pclient:
self.images = list(pclient.images.list())
self.alpine_image = next(
iter([
i for i in self.images
if 'docker.io/library/alpine:latest' in i['repoTags']
] or []), None)
return self.images
def test_list(self):
actual = self.loadCache()
self.assertGreaterEqual(len(actual), 2)
self.assertIsNotNone(self.alpine_image)
def test_build(self):
path = os.path.join(self.tmpdir, 'ctnr', 'Dockerfile')
img, logs = self.pclient.images.build(
dockerfile=[path],
tags=['alpine-unittest'],
)
self.assertIsNotNone(img)
self.assertIn('localhost/alpine-unittest:latest', img.repoTags)
self.assertLess(
podman.datetime_parse(img.created), datetime.now(timezone.utc))
self.assertTrue(logs)
def test_create(self):
img_details = self.alpine_image.inspect()
actual = self.alpine_image.container()
self.assertIsNotNone(actual)
self.assertEqual(actual.status, 'configured')
ctnr = actual.start()
self.assertIn(ctnr.status, ['running', 'exited'])
ctnr_details = ctnr.inspect()
for e in img_details.containerconfig['env']:
self.assertIn(e, ctnr_details.config['env'])
def test_export(self):
path = os.path.join(self.tmpdir, 'alpine_export.tar')
target = 'oci-archive:{}:latest'.format(path)
actual = self.alpine_image.export(target, False)
self.assertTrue(actual)
self.assertTrue(os.path.isfile(path))
def test_get(self):
actual = self.pclient.images.get(self.alpine_image.id)
self.assertEqual(actual, self.alpine_image)
def test_history(self):
records = []
bucket = Counter()
for record in self.alpine_image.history():
self.assertIn(record.id, (self.alpine_image.id, '<missing>'))
bucket[record.id] += 1
records.append(record)
self.assertGreater(bucket[self.alpine_image.id], 0)
self.assertEqual(sum(bucket.values()), len(records))
def test_inspect(self):
actual = self.alpine_image.inspect()
self.assertEqual(actual.id, self.alpine_image.id)
def test_push(self):
path = '{}/alpine_push'.format(self.tmpdir)
target = 'dir:{}'.format(path)
self.alpine_image.push(target)
self.assertTrue(os.path.isfile(os.path.join(path, 'manifest.json')))
self.assertTrue(os.path.isfile(os.path.join(path, 'version')))
def test_tag(self):
self.assertEqual(self.alpine_image.id,
self.alpine_image.tag('alpine:fubar'))
self.loadCache()
self.assertIn('alpine:fubar', self.alpine_image.repoTags)
def test_remove(self):
before = self.loadCache()
# assertRaises doesn't follow the import name :(
with self.assertRaises(podman.ErrorOccurred):
self.alpine_image.remove()
actual = self.alpine_image.remove(force=True)
self.assertEqual(self.alpine_image.id, actual)
after = self.loadCache()
self.assertLess(len(after), len(before))
TestImages.setUpClass()
self.loadCache()
def test_import_delete_unused(self):
before = self.loadCache()
# create unused image, so we have something to delete
source = os.path.join(self.tmpdir, 'alpine_gold.tar')
new_img = self.pclient.images.import_image(
source,
'alpine2:latest',
'unittest.test_import',
)
after = self.loadCache()
self.assertEqual(len(before) + 1, len(after))
self.assertIsNotNone(
next(iter([i for i in after if new_img in i['id']] or []), None))
actual = self.pclient.images.delete_unused()
self.assertIn(new_img, actual)
after = self.loadCache()
self.assertGreaterEqual(len(before), len(after))
TestImages.setUpClass()
self.loadCache()
def test_pull(self):
before = self.loadCache()
actual = self.pclient.images.pull('prom/busybox:latest')
after = self.loadCache()
self.assertEqual(len(before) + 1, len(after))
self.assertIsNotNone(
next(iter([i for i in after if actual in i['id']] or []), None))
def test_search(self):
actual = self.pclient.images.search('alpine', 25)
names, length = itertools.tee(actual)
for img in names:
self.assertIn('alpine', img.name)
self.assertTrue(0 < len(list(length)) <= 25)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,53 @@
import datetime
import unittest
import podman
class TestLibs(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_parse(self):
expected = datetime.datetime.strptime(
'2018-05-08T14:12:53.797795-0700', '%Y-%m-%dT%H:%M:%S.%f%z')
for v in [
'2018-05-08T14:12:53.797795191-07:00',
'2018-05-08T14:12:53.797795-07:00',
'2018-05-08T14:12:53.797795-0700',
'2018-05-08 14:12:53.797795191 -0700 MST',
]:
actual = podman.datetime_parse(v)
self.assertEqual(actual, expected)
expected = datetime.datetime.strptime(
'2018-05-08T14:12:53.797795-0000', '%Y-%m-%dT%H:%M:%S.%f%z')
for v in [
'2018-05-08T14:12:53.797795191Z',
'2018-05-08T14:12:53.797795191z',
]:
actual = podman.datetime_parse(v)
self.assertEqual(actual, expected)
actual = podman.datetime_parse(datetime.datetime.now().isoformat())
self.assertIsNotNone(actual)
def test_parse_fail(self):
for v in [
'There is no time here.',
]:
with self.assertRaises(ValueError):
podman.datetime_parse(v)
def test_format(self):
expected = '2018-05-08T18:24:52.753227-07:00'
dt = podman.datetime_parse(expected)
actual = podman.datetime_format(dt)
self.assertEqual(actual, expected)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,141 @@
#!/bin/bash
# podman needs to play some games with resources
if [[ $(id -u) != 0 ]]; then
echo >&2 $0 must be run as root.
exit 2
fi
# setup path to find new binaries _NOT_ system binaries
if [[ ! -x ../../../bin/podman ]]; then
echo 1>&2 Cannot find podman binary from libpod root directory. Run \"make binaries\"
exit 1
fi
export PATH=../../../bin:$PATH
function usage {
echo 1>&2 $0 [-v] [-h] [test.TestCase|test.TestCase.step]
}
while getopts "vh" arg; do
case $arg in
v ) VERBOSE='-v' ;;
h ) usage ; exit 0;;
\? ) usage ; exit 2;;
esac
done
shift $((OPTIND -1))
function cleanup {
# aggressive cleanup as tests may crash leaving crap around
umount '^(shm|nsfs)'
umount '\/run\/netns'
rm -r "$1"
}
# Create temporary directory for storage
export TMPDIR=`mktemp -d /tmp/podman.XXXXXXXXXX`
trap "cleanup $TMPDIR" EXIT
function umount {
# xargs -r always ran once, so write any mount points to file first
mount |awk "/$1/"' { print $3 }' >${TMPDIR}/mounts
if [[ -s ${TMPDIR}/mounts ]]; then
xargs <${TMPDIR}/mounts -t umount
fi
}
function showlog {
[[ -s $1 ]] && cat <<-EOT
$1 =====
$(cat "$1")
EOT
}
# Need locations to store stuff
mkdir -p ${TMPDIR}/{podman,crio,crio-run,cni/net.d,ctnr,tunnel}
# Cannot be done in python unittest fixtures. EnvVar not picked up.
export REGISTRIES_CONFIG_PATH=${TMPDIR}/registry.conf
cat >$REGISTRIES_CONFIG_PATH <<-EOT
[registries.search]
registries = ['docker.io']
[registries.insecure]
registries = []
[registries.block]
registries = []
EOT
export CNI_CONFIG_PATH=${TMPDIR}/cni/net.d
cat >$CNI_CONFIG_PATH/87-podman-bridge.conflist <<-EOT
{
"cniVersion": "0.3.0",
"name": "podman",
"plugins": [{
"type": "bridge",
"bridge": "cni0",
"isGateway": true,
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "10.88.0.0/16",
"routes": [{
"dst": "0.0.0.0/0"
}]
}
},
{
"type": "portmap",
"capabilities": {
"portMappings": true
}
}
]
}
EOT
cat >$TMPDIR/ctnr/hello.sh <<-EOT
echo 'Hello, World'
EOT
cat >$TMPDIR/ctnr/Dockerfile <<-EOT
FROM alpine:latest
COPY ./hello.sh /tmp/hello.sh
RUN chmod 755 /tmp/hello.sh
ENTRYPOINT ["/tmp/hello.sh"]
EOT
export PODMAN_HOST="unix:${TMPDIR}/podman/io.projectatomic.podman"
PODMAN_ARGS="--storage-driver=vfs \
--root=${TMPDIR}/crio \
--runroot=${TMPDIR}/crio-run \
--cni-config-dir=$CNI_CONFIG_PATH \
"
if [[ -n $VERBOSE ]]; then
PODMAN_ARGS="$PODMAN_ARGS --log-level=debug"
fi
PODMAN="podman $PODMAN_ARGS"
# document what we're about to do...
$PODMAN --version
set -x
# Run podman in background without systemd for test purposes
$PODMAN varlink --timeout=0 ${PODMAN_HOST} >/tmp/test_runner.output 2>&1 &
if [[ -z $1 ]]; then
export PYTHONPATH=.
python3 -m unittest discover -s . $VERBOSE
else
export PYTHONPATH=.:./test
python3 -m unittest $1 $VERBOSE
fi
set +x
pkill -9 podman
pkill -9 conmon
showlog /tmp/test_runner.output
showlog /tmp/alpine.log
showlog /tmp/busybox.log

View File

@ -0,0 +1,62 @@
import os
import unittest
from urllib.parse import urlparse
import podman
import varlink
class TestSystem(unittest.TestCase):
def setUp(self):
self.host = os.environ['PODMAN_HOST']
self.tmpdir = os.environ['TMPDIR']
def tearDown(self):
pass
def test_bad_address(self):
with self.assertRaisesRegex(varlink.client.ConnectionError,
"Invalid address 'bad address'"):
podman.Client('bad address')
def test_ping(self):
with podman.Client(self.host) as pclient:
self.assertTrue(pclient.system.ping())
def test_remote_ping(self):
host = urlparse(self.host)
remote_uri = 'ssh://root@localhost/{}'.format(host.path)
local_uri = 'unix:{}/tunnel/podman.sock'.format(self.tmpdir)
with podman.Client(
uri=local_uri,
remote_uri=remote_uri,
identity_file=os.path.expanduser('~/.ssh/id_rsa'),
) as remote_client:
remote_client.system.ping()
def test_versions(self):
with podman.Client(self.host) as pclient:
# Values change with each build so we cannot test too much
self.assertListEqual(
sorted([
'built', 'client_version', 'git_commit', 'go_version',
'os_arch', 'version'
]), sorted(list(pclient.system.versions._fields)))
pclient.system.versions
self.assertIsNot(podman.__version__, '0.0.0')
def test_info(self):
with podman.Client(self.host) as pclient:
actual = pclient.system.info()
# Values change too much to do exhaustive testing
self.assertIsNotNone(actual.podman['go_version'])
self.assertListEqual(
sorted([
'host', 'insecure_registries', 'podman', 'registries',
'store'
]), sorted(list(actual._fields)))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,79 @@
from __future__ import absolute_import
import time
import unittest
from unittest.mock import MagicMock, patch
import podman
from podman.libs.tunnel import Context, Portal, Tunnel
class TestTunnel(unittest.TestCase):
def setUp(self):
self.tunnel_01 = MagicMock(spec=Tunnel)
self.tunnel_02 = MagicMock(spec=Tunnel)
def test_portal_ops(self):
portal = Portal(sweap=500)
portal['unix:/01'] = self.tunnel_01
portal['unix:/02'] = self.tunnel_02
self.assertEqual(portal.get('unix:/01'), self.tunnel_01)
self.assertEqual(portal.get('unix:/02'), self.tunnel_02)
del portal['unix:/02']
with self.assertRaises(KeyError):
portal['unix:/02']
self.assertEqual(len(portal), 1)
def test_portal_reaping(self):
portal = Portal(sweap=0.5)
portal['unix:/01'] = self.tunnel_01
portal['unix:/02'] = self.tunnel_02
self.assertEqual(len(portal), 2)
for entry in portal:
self.assertIn(entry, (self.tunnel_01, self.tunnel_02))
time.sleep(1)
portal.reap()
self.assertEqual(len(portal), 0)
def test_portal_no_reaping(self):
portal = Portal(sweap=500)
portal['unix:/01'] = self.tunnel_01
portal['unix:/02'] = self.tunnel_02
portal.reap()
self.assertEqual(len(portal), 2)
for entry in portal:
self.assertIn(entry, (self.tunnel_01, self.tunnel_02))
@patch('subprocess.Popen')
@patch('os.path.exists', return_value=True)
@patch('weakref.finalize')
def test_tunnel(self, mock_finalize, mock_exists, mock_Popen):
context = Context(
'unix:/01',
'io.projectatomic.podman',
'/tmp/user/socket',
'/run/podman/socket',
'user',
'hostname',
'~/.ssh/id_rsa',
)
tunnel = Tunnel(context).bore('unix:/01')
cmd = [
'ssh',
'-fNTq',
'-L',
'{}:{}'.format(context.local_socket, context.remote_socket),
'-i',
context.identity_file,
'ssh://{}@{}'.format(context.username, context.hostname),
]
mock_finalize.assert_called_once_with(tunnel, tunnel.close, 'unix:/01')
mock_exists.assert_called_once_with(context.local_socket)
mock_Popen.assert_called_once_with(cmd, close_fds=True)