From fc6dcd12b3430f2d1ee495ef19d184a088f3bb34 Mon Sep 17 00:00:00 2001
From: Matthew Heon <matthew.heon@pm.me>
Date: Fri, 16 Sep 2022 15:00:37 -0400
Subject: [PATCH] Add support for 'image' volume driver

We added the concept of image volumes in 2.2.0, to support
inspecting an image from within a container. However, this is a
strictly read-only mount, with no modification allowed.

By contrast, the new `image` volume driver creates a c/storage
container as its underlying storage, so we have a read/write
layer. This, in and of itself, is not especially interesting, but
what it will enable in the future is. If we add a new command to
allow these image volumes to be committed, we can now distribute
volumes - and changes to them - via a standard OCI image registry
(which is rather new and quite exciting).

Future work in this area:
- Add support for `podman volume push` (commit volume changes and
  push resulting image to OCI registry).
- Add support for `podman volume pull` (currently, we require
  that the image a volume is created from be already pulled; it
  would be simpler if we had a dedicated command that did the
  pull and made a volume from it)
- Add support for scratch images (make an empty image on demand
  to use as the base of the volume)
- Add UOR support to `podman volume push` and
  `podman volume pull` to enable both with non-image volume
  drivers

Signed-off-by: Matthew Heon <matthew.heon@pm.me>
---
 .../source/markdown/podman-volume-create.1.md | 15 ++++-
 libpod/boltdb_state.go                        | 53 ++++++++++++++++
 libpod/boltdb_state_internal.go               | 14 ++++-
 libpod/define/config.go                       |  4 ++
 libpod/define/volume_inspect.go               |  3 +
 libpod/runtime.go                             |  3 +
 libpod/runtime_cstorage.go                    | 11 ++++
 libpod/runtime_ctr.go                         |  2 +-
 libpod/runtime_img.go                         | 21 +++++++
 libpod/runtime_volume_linux.go                | 61 ++++++++++++++++++-
 libpod/state.go                               |  8 +++
 libpod/volume.go                              | 17 +++++-
 libpod/volume_inspect.go                      |  1 +
 libpod/volume_internal.go                     |  5 ++
 libpod/volume_internal_linux.go               | 16 +++++
 test/e2e/config/containers.conf               |  1 +
 test/e2e/volume_create_test.go                | 54 ++++++++++++++++
 test/e2e/volume_plugin_test.go                |  9 ++-
 18 files changed, 289 insertions(+), 9 deletions(-)

diff --git a/docs/source/markdown/podman-volume-create.1.md b/docs/source/markdown/podman-volume-create.1.md
index 9344881112..1e99df55a9 100644
--- a/docs/source/markdown/podman-volume-create.1.md
+++ b/docs/source/markdown/podman-volume-create.1.md
@@ -17,7 +17,13 @@ driver options can be set using the **--opt** flag.
 
 #### **--driver**=*driver*
 
-Specify the volume driver name (default **local**). Setting this to a value other than **local** Podman attempts to create the volume using a volume plugin with the given name. Such plugins must be defined in the **volume_plugins** section of the **[containers.conf(5)](https://github.com/containers/common/blob/main/docs/containers.conf.5.md)** configuration file.
+Specify the volume driver name (default **local**).
+There are two drivers supported by Podman itself: **local** and **image**.
+The **local** driver uses a directory on disk as the backend by default, but can also use the **mount(8)** command to mount a filesystem as the volume if **--opt** is specified.
+The **image** driver uses an image as the backing store of for the volume.
+An overlay filesystem will be created, which allows changes to the volume to be committed as a new layer on top of the image.
+Using a value other than **local or **image**, Podman will attempt to create the volume using a volume plugin with the given name.
+Such plugins must be defined in the **volume_plugins** section of the **[containers.conf(5)](https://github.com/containers/common/blob/main/docs/containers.conf.5.md)** configuration file.
 
 #### **--help**
 
@@ -43,7 +49,10 @@ The `o` option sets options for the mount, and is equivalent to the `-o` flag to
   - The `o` option supports using volume options other than the UID/GID options with the **local** driver and requires root privileges.
   - The `o` options supports the `timeout` option which allows users to set a driver specific timeout in seconds before volume creation fails. For example, **--opts=o=timeout=10** sets a driver timeout of 10 seconds.
 
-When not using the **local** driver, the given options are passed directly to the volume plugin. In this case, supported options are dictated by the plugin in question, not Podman.
+For the **image** driver, the only supported option is `image`, which specifies the image the volume is based on.
+This option is mandatory when using the **image** driver.
+
+When not using the **local** and **image** drivers, the given options are passed directly to the volume plugin. In this case, supported options are dictated by the plugin in question, not Podman.
 
 ## EXAMPLES
 
@@ -57,6 +66,8 @@ $ podman volume create --label foo=bar myvol
 # podman volume create --opt device=tmpfs --opt type=tmpfs --opt o=nodev,noexec myvol
 
 # podman volume create --opt device=tmpfs --opt type=tmpfs --opt o=uid=1000,gid=1000 testvol
+
+# podman volume create --driver image --opt image=fedora:latest fedoraVol
 ```
 
 ## QUOTAS
diff --git a/libpod/boltdb_state.go b/libpod/boltdb_state.go
index 4fd95a3cfb..77c2348925 100644
--- a/libpod/boltdb_state.go
+++ b/libpod/boltdb_state.go
@@ -109,6 +109,7 @@ func NewBoltState(path string, runtime *Runtime) (State, error) {
 		runtimeConfigBkt,
 		exitCodeBkt,
 		exitCodeTimeStampBkt,
+		volCtrsBkt,
 	}
 
 	// Does the DB need an update?
@@ -2551,6 +2552,11 @@ func (s *BoltState) AddVolume(volume *Volume) error {
 			return err
 		}
 
+		volCtrsBkt, err := getVolumeContainersBucket(tx)
+		if err != nil {
+			return err
+		}
+
 		// Check if we already have a volume with the given name
 		volExists := allVolsBkt.Get(volName)
 		if volExists != nil {
@@ -2580,6 +2586,12 @@ func (s *BoltState) AddVolume(volume *Volume) error {
 			}
 		}
 
+		if volume.config.StorageID != "" {
+			if err := volCtrsBkt.Put([]byte(volume.config.StorageID), volName); err != nil {
+				return fmt.Errorf("storing volume %s container ID in DB: %w", volume.Name(), err)
+			}
+		}
+
 		if err := allVolsBkt.Put(volName, volName); err != nil {
 			return fmt.Errorf("storing volume %s in all volumes bucket in DB: %w", volume.Name(), err)
 		}
@@ -2619,6 +2631,11 @@ func (s *BoltState) RemoveVolume(volume *Volume) error {
 			return err
 		}
 
+		volCtrIDBkt, err := getVolumeContainersBucket(tx)
+		if err != nil {
+			return err
+		}
+
 		// Check if the volume exists
 		volDB := volBkt.Bucket(volName)
 		if volDB == nil {
@@ -2665,6 +2682,11 @@ func (s *BoltState) RemoveVolume(volume *Volume) error {
 		if err := volBkt.DeleteBucket(volName); err != nil {
 			return fmt.Errorf("removing volume %s from DB: %w", volume.Name(), err)
 		}
+		if volume.config.StorageID != "" {
+			if err := volCtrIDBkt.Delete([]byte(volume.config.StorageID)); err != nil {
+				return fmt.Errorf("removing volume %s container ID from DB: %w", volume.Name(), err)
+			}
+		}
 
 		return nil
 	})
@@ -3618,3 +3640,34 @@ func (s *BoltState) AllPods() ([]*Pod, error) {
 
 	return pods, nil
 }
+
+// ContainerIDIsVolume checks if the given c/storage container ID is used as
+// backing storage for a volume.
+func (s *BoltState) ContainerIDIsVolume(id string) (bool, error) {
+	if !s.valid {
+		return false, define.ErrDBClosed
+	}
+
+	isVol := false
+
+	db, err := s.getDBCon()
+	if err != nil {
+		return false, err
+	}
+	defer s.deferredCloseDBCon(db)
+
+	err = db.View(func(tx *bolt.Tx) error {
+		volCtrsBkt, err := getVolumeContainersBucket(tx)
+		if err != nil {
+			return err
+		}
+
+		volName := volCtrsBkt.Get([]byte(id))
+		if volName != nil {
+			isVol = true
+		}
+
+		return nil
+	})
+	return isVol, err
+}
diff --git a/libpod/boltdb_state_internal.go b/libpod/boltdb_state_internal.go
index 87f1fa4ebc..7f2d49b316 100644
--- a/libpod/boltdb_state_internal.go
+++ b/libpod/boltdb_state_internal.go
@@ -28,6 +28,7 @@ const (
 	execName          = "exec"
 	aliasesName       = "aliases"
 	runtimeConfigName = "runtime-config"
+	volumeCtrsName    = "volume-ctrs"
 
 	exitCodeName          = "exit-code"
 	exitCodeTimeStampName = "exit-code-time-stamp"
@@ -67,6 +68,7 @@ var (
 	dependenciesBkt    = []byte(dependenciesName)
 	volDependenciesBkt = []byte(volCtrDependencies)
 	networksBkt        = []byte(networksName)
+	volCtrsBkt         = []byte(volumeCtrsName)
 
 	exitCodeBkt          = []byte(exitCodeName)
 	exitCodeTimeStampBkt = []byte(exitCodeTimeStampName)
@@ -384,6 +386,14 @@ func getExitCodeTimeStampBucket(tx *bolt.Tx) (*bolt.Bucket, error) {
 	return bkt, nil
 }
 
+func getVolumeContainersBucket(tx *bolt.Tx) (*bolt.Bucket, error) {
+	bkt := tx.Bucket(volCtrsBkt)
+	if bkt == nil {
+		return nil, fmt.Errorf("volume containers bucket not found in DB: %w", define.ErrDBBadConfig)
+	}
+	return bkt, nil
+}
+
 func (s *BoltState) getContainerConfigFromDB(id []byte, config *ContainerConfig, ctrsBkt *bolt.Bucket) error {
 	ctrBkt := ctrsBkt.Bucket(id)
 	if ctrBkt == nil {
@@ -528,6 +538,9 @@ func (s *BoltState) getVolumeFromDB(name []byte, volume *Volume, volBkt *bolt.Bu
 		}
 	}
 
+	// Need this for UsesVolumeDriver() so set it now.
+	volume.runtime = s.runtime
+
 	// Retrieve volume driver
 	if volume.UsesVolumeDriver() {
 		plugin, err := s.runtime.getVolumePlugin(volume.config)
@@ -550,7 +563,6 @@ func (s *BoltState) getVolumeFromDB(name []byte, volume *Volume, volBkt *bolt.Bu
 	}
 	volume.lock = lock
 
-	volume.runtime = s.runtime
 	volume.valid = true
 
 	return nil
diff --git a/libpod/define/config.go b/libpod/define/config.go
index 1fad5cc9af..0427206edb 100644
--- a/libpod/define/config.go
+++ b/libpod/define/config.go
@@ -40,6 +40,10 @@ type InfoData struct {
 // itself.
 const VolumeDriverLocal = "local"
 
+// VolumeDriverImage is the "image" volume driver. It is managed by Libpod and
+// uses volumes backed by an image.
+const VolumeDriverImage = "image"
+
 const (
 	OCIManifestDir  = "oci-dir"
 	OCIArchive      = "oci-archive"
diff --git a/libpod/define/volume_inspect.go b/libpod/define/volume_inspect.go
index 76120647cc..4d6f12080a 100644
--- a/libpod/define/volume_inspect.go
+++ b/libpod/define/volume_inspect.go
@@ -58,6 +58,9 @@ type InspectVolumeData struct {
 	NeedsChown bool `json:"NeedsChown,omitempty"`
 	// Timeout is the specified driver timeout if given
 	Timeout uint `json:"Timeout,omitempty"`
+	// StorageID is the ID of the container backing the volume in c/storage.
+	// Only used with Image Volumes.
+	StorageID string `json:"StorageID,omitempty"`
 }
 
 type VolumeReload struct {
diff --git a/libpod/runtime.go b/libpod/runtime.go
index 83c9f53e26..f250d759c8 100644
--- a/libpod/runtime.go
+++ b/libpod/runtime.go
@@ -1091,6 +1091,9 @@ func (r *Runtime) getVolumePlugin(volConfig *VolumeConfig) (*plugin.VolumePlugin
 
 	pluginPath, ok := r.config.Engine.VolumePlugins[name]
 	if !ok {
+		if name == define.VolumeDriverImage {
+			return nil, nil
+		}
 		return nil, fmt.Errorf("no volume plugin with name %s available: %w", name, define.ErrMissingPlugin)
 	}
 
diff --git a/libpod/runtime_cstorage.go b/libpod/runtime_cstorage.go
index 372434b491..5917b79314 100644
--- a/libpod/runtime_cstorage.go
+++ b/libpod/runtime_cstorage.go
@@ -86,6 +86,17 @@ func (r *Runtime) RemoveStorageContainer(idOrName string, force bool) error {
 		return fmt.Errorf("refusing to remove %q as it exists in libpod as container %s: %w", idOrName, ctr.ID, define.ErrCtrExists)
 	}
 
+	// Error out if this is an image-backed volume
+	allVols, err := r.state.AllVolumes()
+	if err != nil {
+		return err
+	}
+	for _, vol := range allVols {
+		if vol.config.Driver == define.VolumeDriverImage && vol.config.StorageID == ctr.ID {
+			return fmt.Errorf("refusing to remove %q as it exists in libpod as an image-backed volume %s: %w", idOrName, vol.Name(), define.ErrCtrExists)
+		}
+	}
+
 	if !force {
 		timesMounted, err := r.store.Mounted(ctr.ID)
 		if err != nil {
diff --git a/libpod/runtime_ctr.go b/libpod/runtime_ctr.go
index 7b3cbadfa0..bb30078cb5 100644
--- a/libpod/runtime_ctr.go
+++ b/libpod/runtime_ctr.go
@@ -512,7 +512,7 @@ func (r *Runtime) setupContainer(ctx context.Context, ctr *Container) (_ *Contai
 				volOptions = append(volOptions, parsedOptions...)
 			}
 		}
-		newVol, err := r.newVolume(false, volOptions...)
+		newVol, err := r.newVolume(ctx, false, volOptions...)
 		if err != nil {
 			return nil, fmt.Errorf("creating named volume %q: %w", vol.Name, err)
 		}
diff --git a/libpod/runtime_img.go b/libpod/runtime_img.go
index 87b77c3eba..d8e88ca503 100644
--- a/libpod/runtime_img.go
+++ b/libpod/runtime_img.go
@@ -51,6 +51,23 @@ func (r *Runtime) RemoveContainersForImageCallback(ctx context.Context) libimage
 				}
 			}
 		}
+
+		// Need to handle volumes with the image driver
+		vols, err := r.state.AllVolumes()
+		if err != nil {
+			return err
+		}
+		for _, vol := range vols {
+			if vol.config.Driver != define.VolumeDriverImage || vol.config.StorageImageID != imageID {
+				continue
+			}
+			// Do a force removal of the volume, and all containers
+			// using it.
+			if err := r.RemoveVolume(ctx, vol, true, nil); err != nil {
+				return fmt.Errorf("removing image %s: volume %s backed by image could not be removed: %w", imageID, vol.Name(), err)
+			}
+		}
+
 		// Note that `libimage` will take care of removing any leftover
 		// containers from the storage.
 		return nil
@@ -74,6 +91,10 @@ func (r *Runtime) IsExternalContainerCallback(_ context.Context) libimage.IsExte
 		if errors.Is(err, define.ErrNoSuchCtr) {
 			return true, nil
 		}
+		isVol, err := r.state.ContainerIDIsVolume(idOrName)
+		if err == nil && !isVol {
+			return true, nil
+		}
 		return false, nil
 	}
 }
diff --git a/libpod/runtime_volume_linux.go b/libpod/runtime_volume_linux.go
index 08fdbf9771..c59417979e 100644
--- a/libpod/runtime_volume_linux.go
+++ b/libpod/runtime_volume_linux.go
@@ -15,6 +15,7 @@ import (
 	"github.com/containers/podman/v4/libpod/define"
 	"github.com/containers/podman/v4/libpod/events"
 	volplugin "github.com/containers/podman/v4/libpod/plugin"
+	"github.com/containers/storage"
 	"github.com/containers/storage/drivers/quota"
 	"github.com/containers/storage/pkg/idtools"
 	"github.com/containers/storage/pkg/stringid"
@@ -22,18 +23,20 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
+const volumeSuffix = "+volume"
+
 // NewVolume creates a new empty volume
 func (r *Runtime) NewVolume(ctx context.Context, options ...VolumeCreateOption) (*Volume, error) {
 	if !r.valid {
 		return nil, define.ErrRuntimeStopped
 	}
-	return r.newVolume(false, options...)
+	return r.newVolume(ctx, false, options...)
 }
 
 // newVolume creates a new empty volume with the given options.
 // The createPluginVolume can be set to true to make it not create the volume in the volume plugin,
 // this is required for the UpdateVolumePlugins() function. If you are not sure, set this to false.
-func (r *Runtime) newVolume(noCreatePluginVolume bool, options ...VolumeCreateOption) (_ *Volume, deferredErr error) {
+func (r *Runtime) newVolume(ctx context.Context, noCreatePluginVolume bool, options ...VolumeCreateOption) (_ *Volume, deferredErr error) {
 	volume := newVolume(r)
 	for _, option := range options {
 		if err := option(volume); err != nil {
@@ -83,6 +86,50 @@ func (r *Runtime) newVolume(noCreatePluginVolume bool, options ...VolumeCreateOp
 				return nil, fmt.Errorf("invalid mount option %s for driver 'local': %w", key, define.ErrInvalidArg)
 			}
 		}
+	} else if volume.config.Driver == define.VolumeDriverImage && !volume.UsesVolumeDriver() {
+		logrus.Debugf("Creating image-based volume")
+		var imgString string
+		// Validate options
+		for key, val := range volume.config.Options {
+			switch strings.ToLower(key) {
+			case "image":
+				imgString = val
+			default:
+				return nil, fmt.Errorf("invalid mount option %s for driver 'image': %w", key, define.ErrInvalidArg)
+			}
+		}
+
+		if imgString == "" {
+			return nil, fmt.Errorf("must provide an image name when creating a volume with the image driver: %w", define.ErrInvalidArg)
+		}
+
+		// Look up the image
+		image, _, err := r.libimageRuntime.LookupImage(imgString, nil)
+		if err != nil {
+			return nil, fmt.Errorf("looking up image %s to create volume failed: %w", imgString, err)
+		}
+
+		// Generate a c/storage name and ID for the volume.
+		// Use characters Podman does not allow for the name, to ensure
+		// no collision with containers.
+		volume.config.StorageID = stringid.GenerateRandomID()
+		volume.config.StorageName = volume.config.Name + volumeSuffix
+		volume.config.StorageImageID = image.ID()
+
+		// Create a backing container in c/storage.
+		storageConfig := storage.ContainerOptions{
+			LabelOpts: []string{"filetype:container_file_t:s0"},
+		}
+		if _, err := r.storageService.CreateContainerStorage(ctx, r.imageContext, imgString, image.ID(), volume.config.StorageName, volume.config.StorageID, storageConfig); err != nil {
+			return nil, fmt.Errorf("creating backing storage for image driver: %w", err)
+		}
+		defer func() {
+			if deferredErr != nil {
+				if err := r.storageService.DeleteContainer(volume.config.StorageID); err != nil {
+					logrus.Errorf("Error removing volume %s backing storage: %v", volume.config.Name, err)
+				}
+			}
+		}()
 	}
 
 	// Now we get conditional: we either need to make the volume in the
@@ -196,7 +243,7 @@ func (r *Runtime) UpdateVolumePlugins(ctx context.Context) *define.VolumeReload
 		}
 		for _, vol := range vols {
 			allPluginVolumes[vol.Name] = struct{}{}
-			if _, err := r.newVolume(true, WithVolumeName(vol.Name), WithVolumeDriver(driverName)); err != nil {
+			if _, err := r.newVolume(ctx, true, WithVolumeName(vol.Name), WithVolumeDriver(driverName)); err != nil {
 				// If the volume exists this is not an error, just ignore it and log. It is very likely
 				// that the volume from the plugin was already in our db.
 				if !errors.Is(err, define.ErrVolumeExists) {
@@ -375,6 +422,14 @@ func (r *Runtime) removeVolume(ctx context.Context, v *Volume, force bool, timeo
 				return fmt.Errorf("volume %s could not be removed from plugin %s: %w", v.Name(), v.Driver(), err)
 			}
 		}
+	} else if v.config.Driver == define.VolumeDriverImage {
+		if err := v.runtime.storageService.DeleteContainer(v.config.StorageID); err != nil {
+			// Storage container is already gone, no problem.
+			if !(errors.Is(err, storage.ErrNotAContainer) || errors.Is(err, storage.ErrContainerUnknown)) {
+				return fmt.Errorf("removing volume %s storage: %w", v.Name(), err)
+			}
+			logrus.Infof("Storage for volume %s already removed", v.Name())
+		}
 	}
 
 	// Remove the volume from the state
diff --git a/libpod/state.go b/libpod/state.go
index 4fbd3c302a..9d96045635 100644
--- a/libpod/state.go
+++ b/libpod/state.go
@@ -144,6 +144,14 @@ type State interface {
 	// As with RemoveExecSession, container state will not be modified.
 	RemoveContainerExecSessions(ctr *Container) error
 
+	// ContainerIDIsVolume checks if the given container ID is in use by a
+	// volume.
+	// Some volumes are backed by a c/storage container. These do not have a
+	// corresponding Container struct in Libpod, but rather a Volume.
+	// This determines if a given ID from c/storage is used as a backend by
+	// a Podman volume.
+	ContainerIDIsVolume(id string) (bool, error)
+
 	// PLEASE READ FULL DESCRIPTION BEFORE USING.
 	// Rewrite a container's configuration.
 	// This function breaks libpod's normal prohibition on a read-only
diff --git a/libpod/volume.go b/libpod/volume.go
index a054e40322..2d4ea4280e 100644
--- a/libpod/volume.go
+++ b/libpod/volume.go
@@ -57,6 +57,15 @@ type VolumeConfig struct {
 	DisableQuota bool `json:"disableQuota,omitempty"`
 	// Timeout allows users to override the default driver timeout of 5 seconds
 	Timeout *uint `json:"timeout,omitempty"`
+	// StorageName is the name of the volume in c/storage. Only used for
+	// image volumes.
+	StorageName string `json:"storageName,omitempty"`
+	// StorageID is the ID of the volume in c/storage. Only used for image
+	// volumes.
+	StorageID string `json:"storageID,omitempty"`
+	// StorageImageID is the ID of the image the volume was based off of.
+	// Only used for image volumes.
+	StorageImageID string `json:"storageImageID,omitempty"`
 }
 
 // VolumeState holds the volume's mutable state.
@@ -149,7 +158,7 @@ func (v *Volume) MountCount() (uint, error) {
 
 // Internal-only helper for volume mountpoint
 func (v *Volume) mountPoint() string {
-	if v.UsesVolumeDriver() {
+	if v.UsesVolumeDriver() || v.config.Driver == define.VolumeDriverImage {
 		return v.state.MountPoint
 	}
 
@@ -250,6 +259,12 @@ func (v *Volume) IsDangling() (bool, error) {
 // drivers are pluggable backends for volumes that will manage the storage and
 // mounting.
 func (v *Volume) UsesVolumeDriver() bool {
+	if v.config.Driver == define.VolumeDriverImage {
+		if _, ok := v.runtime.config.Engine.VolumePlugins[v.config.Driver]; ok {
+			return true
+		}
+		return false
+	}
 	return !(v.config.Driver == define.VolumeDriverLocal || v.config.Driver == "")
 }
 
diff --git a/libpod/volume_inspect.go b/libpod/volume_inspect.go
index 73441576b0..31fbd5eff0 100644
--- a/libpod/volume_inspect.go
+++ b/libpod/volume_inspect.go
@@ -64,6 +64,7 @@ func (v *Volume) Inspect() (*define.InspectVolumeData, error) {
 	data.MountCount = v.state.MountCount
 	data.NeedsCopyUp = v.state.NeedsCopyUp
 	data.NeedsChown = v.state.NeedsChown
+	data.StorageID = v.config.StorageID
 
 	if v.config.Timeout != nil {
 		data.Timeout = *v.config.Timeout
diff --git a/libpod/volume_internal.go b/libpod/volume_internal.go
index 43c3f9b0be..14b852f8e3 100644
--- a/libpod/volume_internal.go
+++ b/libpod/volume_internal.go
@@ -39,6 +39,11 @@ func (v *Volume) needsMount() bool {
 		return true
 	}
 
+	// Image driver always needs mount
+	if v.config.Driver == define.VolumeDriverImage {
+		return true
+	}
+
 	// Commit 28138dafcc added the UID and GID options to this map
 	// However we should only mount when options other than uid and gid are set.
 	// see https://github.com/containers/podman/issues/10620
diff --git a/libpod/volume_internal_linux.go b/libpod/volume_internal_linux.go
index cfd60554d5..440bceec30 100644
--- a/libpod/volume_internal_linux.go
+++ b/libpod/volume_internal_linux.go
@@ -63,6 +63,15 @@ func (v *Volume) mount() error {
 			return err
 		}
 
+		v.state.MountCount++
+		v.state.MountPoint = mountPoint
+		return v.save()
+	} else if v.config.Driver == define.VolumeDriverImage {
+		mountPoint, err := v.runtime.storageService.MountContainerImage(v.config.StorageID)
+		if err != nil {
+			return fmt.Errorf("mounting volume %s image failed: %w", v.Name(), err)
+		}
+
 		v.state.MountCount++
 		v.state.MountPoint = mountPoint
 		return v.save()
@@ -159,6 +168,13 @@ func (v *Volume) unmount(force bool) error {
 				return err
 			}
 
+			v.state.MountPoint = ""
+			return v.save()
+		} else if v.config.Driver == define.VolumeDriverImage {
+			if _, err := v.runtime.storageService.UnmountContainerImage(v.config.StorageID, force); err != nil {
+				return fmt.Errorf("unmounting volume %s image: %w", v.Name(), err)
+			}
+
 			v.state.MountPoint = ""
 			return v.save()
 		}
diff --git a/test/e2e/config/containers.conf b/test/e2e/config/containers.conf
index 94bb316b10..3cf20268c3 100644
--- a/test/e2e/config/containers.conf
+++ b/test/e2e/config/containers.conf
@@ -76,3 +76,4 @@ testvol6 = "/run/docker/plugins/testvol6.sock"
 testvol7 = "/run/docker/plugins/testvol7.sock"
 testvol8 = "/run/docker/plugins/testvol8.sock"
 testvol9 = "/run/docker/plugins/testvol9.sock"
+image = "/run/docker/plugins/image.sock"
diff --git a/test/e2e/volume_create_test.go b/test/e2e/volume_create_test.go
index 499283cabe..5dfa4d0fcd 100644
--- a/test/e2e/volume_create_test.go
+++ b/test/e2e/volume_create_test.go
@@ -162,4 +162,58 @@ var _ = Describe("Podman volume create", func() {
 		Expect(inspectOpts).Should(Exit(0))
 		Expect(inspectOpts.OutputToString()).To(Equal(optionStrFormatExpect))
 	})
+
+	It("image-backed volume basic functionality", func() {
+		podmanTest.AddImageToRWStore(fedoraMinimal)
+		volName := "testvol"
+		volCreate := podmanTest.Podman([]string{"volume", "create", "--driver", "image", "--opt", fmt.Sprintf("image=%s", fedoraMinimal), volName})
+		volCreate.WaitWithDefaultTimeout()
+		Expect(volCreate).Should(Exit(0))
+
+		runCmd := podmanTest.Podman([]string{"run", "-v", fmt.Sprintf("%s:/test", volName), ALPINE, "cat", "/test/etc/redhat-release"})
+		runCmd.WaitWithDefaultTimeout()
+		Expect(runCmd).Should(Exit(0))
+		Expect(runCmd.OutputToString()).To(ContainSubstring("Fedora"))
+
+		rmCmd := podmanTest.Podman([]string{"rmi", "--force", fedoraMinimal})
+		rmCmd.WaitWithDefaultTimeout()
+		Expect(rmCmd).Should(Exit(0))
+
+		psCmd := podmanTest.Podman([]string{"ps", "-aq"})
+		psCmd.WaitWithDefaultTimeout()
+		Expect(psCmd).Should(Exit(0))
+		Expect(psCmd.OutputToString()).To(BeEmpty())
+
+		volumesCmd := podmanTest.Podman([]string{"volume", "ls", "-q"})
+		volumesCmd.WaitWithDefaultTimeout()
+		Expect(volumesCmd).Should(Exit(0))
+		Expect(volumesCmd.OutputToString()).To(Not(ContainSubstring(volName)))
+	})
+
+	It("image-backed volume force removal", func() {
+		podmanTest.AddImageToRWStore(fedoraMinimal)
+		volName := "testvol"
+		volCreate := podmanTest.Podman([]string{"volume", "create", "--driver", "image", "--opt", fmt.Sprintf("image=%s", fedoraMinimal), volName})
+		volCreate.WaitWithDefaultTimeout()
+		Expect(volCreate).Should(Exit(0))
+
+		runCmd := podmanTest.Podman([]string{"run", "-v", fmt.Sprintf("%s:/test", volName), ALPINE, "cat", "/test/etc/redhat-release"})
+		runCmd.WaitWithDefaultTimeout()
+		Expect(runCmd).Should(Exit(0))
+		Expect(runCmd.OutputToString()).To(ContainSubstring("Fedora"))
+
+		rmCmd := podmanTest.Podman([]string{"volume", "rm", "--force", volName})
+		rmCmd.WaitWithDefaultTimeout()
+		Expect(rmCmd).Should(Exit(0))
+
+		psCmd := podmanTest.Podman([]string{"ps", "-aq"})
+		psCmd.WaitWithDefaultTimeout()
+		Expect(psCmd).Should(Exit(0))
+		Expect(psCmd.OutputToString()).To(BeEmpty())
+
+		volumesCmd := podmanTest.Podman([]string{"volume", "ls", "-q"})
+		volumesCmd.WaitWithDefaultTimeout()
+		Expect(volumesCmd).Should(Exit(0))
+		Expect(volumesCmd.OutputToString()).To(Not(ContainSubstring(volName)))
+	})
 })
diff --git a/test/e2e/volume_plugin_test.go b/test/e2e/volume_plugin_test.go
index 33cdcce5b0..00498431e8 100644
--- a/test/e2e/volume_plugin_test.go
+++ b/test/e2e/volume_plugin_test.go
@@ -60,7 +60,8 @@ var _ = Describe("Podman volume plugins", func() {
 		Expect(err).ToNot(HaveOccurred())
 
 		// Keep this distinct within tests to avoid multiple tests using the same plugin.
-		pluginName := "testvol1"
+		// This one verifies that the "image" plugin uses a volume plugin, not the "image" driver.
+		pluginName := "image"
 		plugin := podmanTest.Podman([]string{"run", "--security-opt", "label=disable", "-v", "/run/docker/plugins:/run/docker/plugins", "-v", fmt.Sprintf("%v:%v", pluginStatePath, pluginStatePath), "-d", volumeTest, "--sock-name", pluginName, "--path", pluginStatePath})
 		plugin.WaitWithDefaultTimeout()
 		Expect(plugin).Should(Exit(0))
@@ -77,6 +78,12 @@ var _ = Describe("Podman volume plugins", func() {
 		Expect(arrOutput).To(HaveLen(1))
 		Expect(arrOutput[0]).To(ContainSubstring(volName))
 
+		// Verify this is not an image volume.
+		inspect := podmanTest.Podman([]string{"volume", "inspect", volName, "--format", "{{.StorageID}}"})
+		inspect.WaitWithDefaultTimeout()
+		Expect(inspect).Should(Exit(0))
+		Expect(inspect.OutputToString()).To(BeEmpty())
+
 		remove := podmanTest.Podman([]string{"volume", "rm", volName})
 		remove.WaitWithDefaultTimeout()
 		Expect(remove).Should(Exit(0))