Add support for volume operations to SQLite state

Signed-off-by: Matt Heon <mheon@redhat.com>
This commit is contained in:
Matt Heon
2023-02-21 15:53:03 -05:00
parent c0b92bdbc7
commit 59a54f32dc
2 changed files with 267 additions and 464 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"path/filepath"
goruntime "runtime"
"strings"
"time"
"github.com/containers/common/libnetwork/types"
@ -481,6 +482,7 @@ func (s *SQLiteState) LookupContainerID(idOrName string) (string, error) {
if err := rows.Scan(&id); err != nil {
return "", fmt.Errorf("retrieving container %q ID from database: %w", idOrName, err)
}
foundResult = true
}
if !foundResult {
return "", define.ErrNoSuchCtr
@ -514,8 +516,9 @@ func (s *SQLiteState) LookupContainer(idOrName string) (*Container, error) {
}
if err := rows.Scan(&rawJSON); err != nil {
return nil, fmt.Errorf("error retrieving container %q ID from database: %w", idOrName, err)
return nil, fmt.Errorf("retrieving container %q ID from database: %w", idOrName, err)
}
foundResult = true
}
if !foundResult {
return nil, define.ErrNoSuchCtr
@ -1014,7 +1017,7 @@ func (s *SQLiteState) RemoveExecSession(session *ExecSession) (defErr error) {
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to remove container %s exec session %s: %v", session.ContainerID, session.Id, err)
logrus.Errorf("Rolling back transaction to remove container %s exec session %s: %v", session.ContainerId, session.Id, err)
}
}
}()
@ -1304,6 +1307,7 @@ func (s *SQLiteState) LookupPod(idOrName string) (*Pod, error) {
if err := rows.Scan(&rawJSON); err != nil {
return nil, fmt.Errorf("error retrieving pod %q ID from database: %w", idOrName, err)
}
foundResult = true
}
if !foundResult {
return nil, define.ErrNoSuchPod
@ -2140,8 +2144,7 @@ func (s *SQLiteState) AllPods() ([]*Pod, error) {
// AddVolume adds the given volume to the state. It also adds ctrDepID to
// the sub bucket holding the container dependencies that this volume has
// TODO TODO TODO
func (s *SQLiteState) AddVolume(volume *Volume) error {
func (s *SQLiteState) AddVolume(volume *Volume) (defErr error) {
if !s.valid {
return define.ErrDBClosed
}
@ -2150,187 +2153,110 @@ func (s *SQLiteState) AddVolume(volume *Volume) error {
return define.ErrVolumeRemoved
}
return define.ErrNotImplemented
cfgJSON, err := json.Marshal(volume.config)
if err != nil {
return fmt.Errorf("marshalling volume %s configuration json: %w", volume.Name(), err)
}
// volName := []byte(volume.Name())
volState := volume.state
if volState == nil {
volState = new(VolumeState)
}
// volConfigJSON, err := json.Marshal(volume.config)
// if err != nil {
// return fmt.Errorf("marshalling volume %s config to JSON: %w", volume.Name(), err)
// }
stateJSON, err := json.Marshal(volState)
if err != nil {
return fmt.Errorf("marshalling volume %s state json: %w", volume.Name(), err)
}
// // Volume state is allowed to not exist
// var volStateJSON []byte
// if volume.state != nil {
// volStateJSON, err = json.Marshal(volume.state)
// if err != nil {
// return fmt.Errorf("marshalling volume %s state to JSON: %w", volume.Name(), err)
// }
// }
storageID := sql.NullString{}
if volume.config.StorageID != "" {
storageID.Valid = true
storageID.String = volume.config.StorageID
}
// db, err := s.getDBCon()
// if err != nil {
// return err
// }
// defer s.deferredCloseDBCon(db)
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning volume create transaction: %w", err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to create volume: %v", err)
}
}
}()
// err = db.Update(func(tx *bolt.Tx) error {
// volBkt, err := getVolBucket(tx)
// if err != nil {
// return err
// }
if _, err := tx.Exec("INSERT INTO VolumeConfig VALUES (?, ?, ?);", volume.Name(), storageID, cfgJSON); err != nil {
return fmt.Errorf("adding volume %s config to database: %w", volume.Name(), err)
}
// allVolsBkt, err := getAllVolsBucket(tx)
// if err != nil {
// return err
// }
if _, err := tx.Exec("INSERT INTO VolumeState VALUES (?, ?);", volume.Name(), stateJSON); err != nil {
return fmt.Errorf("adding volume %s state to database: %w", volume.Name(), err)
}
// volCtrsBkt, err := getVolumeContainersBucket(tx)
// if err != nil {
// return err
// }
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
// // Check if we already have a volume with the given name
// volExists := allVolsBkt.Get(volName)
// if volExists != nil {
// return fmt.Errorf("name %s is in use: %w", volume.Name(), define.ErrVolumeExists)
// }
// // We are good to add the volume
// // Make a bucket for it
// newVol, err := volBkt.CreateBucket(volName)
// if err != nil {
// return fmt.Errorf("creating bucket for volume %s: %w", volume.Name(), err)
// }
// // Make a subbucket for the containers using the volume. Dependent container IDs will be addedremoved to
// // this bucket in addcontainer/removeContainer
// if _, err := newVol.CreateBucket(volDependenciesBkt); err != nil {
// return fmt.Errorf("creating bucket for containers using volume %s: %w", volume.Name(), err)
// }
// if err := newVol.Put(configKey, volConfigJSON); err != nil {
// return fmt.Errorf("storing volume %s configuration in DB: %w", volume.Name(), err)
// }
// if volStateJSON != nil {
// if err := newVol.Put(stateKey, volStateJSON); err != nil {
// return fmt.Errorf("storing volume %s state in DB: %w", volume.Name(), err)
// }
// }
// if volume.config.StorageID != "" {
// if err := volCtrsBkt.Put([]byte(volume.config.StorageID), volName); err != nil {
// return fmt.Errorf("storing volume %s container ID in DB: %w", volume.Name(), err)
// }
// }
// if err := allVolsBkt.Put(volName, volName); err != nil {
// return fmt.Errorf("storing volume %s in all volumes bucket in DB: %w", volume.Name(), err)
// }
// return nil
// })
// return err
return nil
}
// RemoveVolume removes the given volume from the state
// TODO TODO TODO
func (s *SQLiteState) RemoveVolume(volume *Volume) error {
func (s *SQLiteState) RemoveVolume(volume *Volume) (defErr error) {
if !s.valid {
return define.ErrDBClosed
}
return define.ErrNotImplemented
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning volume %s removal transaction: %w", volume.Name(), err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to remove volume %s: %v", volume.Name(), err)
}
}
}()
// volName := []byte(volume.Name())
rows, err := tx.Query("SELECT ContainerID FROM ContainerVolume WHERE VolumeName=?;", volume.Name())
if err != nil {
return fmt.Errorf("querying for containers using volume %s: %w", volume.Name(), err)
}
defer rows.Close()
// db, err := s.getDBCon()
// if err != nil {
// return err
// }
// defer s.deferredCloseDBCon(db)
var ctrs []string
for rows.Next() {
var ctr string
if err := rows.Scan(&ctr); err != nil {
return fmt.Errorf("error scanning row for containers using volume %s: %w", volume.Name(), err)
}
ctrs = append(ctrs, ctr)
}
if len(ctrs) > 0 {
return fmt.Errorf("volume %s is in use by containers %s: %w", volume.Name(), strings.Join(ctrs, ","), define.ErrVolumeBeingUsed)
}
// err = db.Update(func(tx *bolt.Tx) error {
// volBkt, err := getVolBucket(tx)
// if err != nil {
// return err
// }
// TODO TODO TODO:
// Need to verify that at least 1 row was deleted from VolumeConfig.
// Otherwise return ErrNoSuchVolume
// allVolsBkt, err := getAllVolsBucket(tx)
// if err != nil {
// return err
// }
if _, err := tx.Exec("DELETE FROM VolumeConfig WHERE Name=?;", volume.Name()); err != nil {
return fmt.Errorf("removing volume %s config from DB: %w", volume.Name(), err)
}
// ctrBkt, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
if _, err := tx.Exec("DELETE FROM VolumeState WHERE Name=?;", volume.Name()); err != nil {
return fmt.Errorf("removing volume %s state from DB: %w", volume.Name(), err)
}
// volCtrIDBkt, err := getVolumeContainersBucket(tx)
// if err != nil {
// return err
// }
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction to remove volume %s: %w", volume.Name(), err)
}
// // Check if the volume exists
// volDB := volBkt.Bucket(volName)
// if volDB == nil {
// volume.valid = false
// return fmt.Errorf("volume %s does not exist in DB: %w", volume.Name(), define.ErrNoSuchVolume)
// }
// // Check if volume is not being used by any container
// // This should never be nil
// // But if it is, we can assume that no containers are using
// // the volume.
// volCtrsBkt := volDB.Bucket(volDependenciesBkt)
// if volCtrsBkt != nil {
// var deps []string
// err = volCtrsBkt.ForEach(func(id, value []byte) error {
// // Alright, this is ugly.
// // But we need it to work around the change in
// // volume dependency handling, to make sure that
// // older Podman versions don't cause DB
// // corruption.
// // Look up all dependencies and see that they
// // still exist before appending.
// ctrExists := ctrBkt.Bucket(id)
// if ctrExists == nil {
// return nil
// }
// deps = append(deps, string(id))
// return nil
// })
// if err != nil {
// return fmt.Errorf("getting list of dependencies from dependencies bucket for volumes %q: %w", volume.Name(), err)
// }
// if len(deps) > 0 {
// return fmt.Errorf("volume %s is being used by container(s) %s: %w", volume.Name(), strings.Join(deps, ","), define.ErrVolumeBeingUsed)
// }
// }
// // volume is ready for removal
// // Let's kick it out
// if err := allVolsBkt.Delete(volName); err != nil {
// return fmt.Errorf("removing volume %s from all volumes bucket in DB: %w", volume.Name(), err)
// }
// if err := volBkt.DeleteBucket(volName); err != nil {
// return fmt.Errorf("removing volume %s from DB: %w", volume.Name(), err)
// }
// if volume.config.StorageID != "" {
// if err := volCtrIDBkt.Delete([]byte(volume.config.StorageID)); err != nil {
// return fmt.Errorf("removing volume %s container ID from DB: %w", volume.Name(), err)
// }
// }
// return nil
// })
// return err
return nil
}
// UpdateVolume updates the volume's state from the database.
// TODO TODO TODO
func (s *SQLiteState) UpdateVolume(volume *Volume) error {
if !s.valid {
return define.ErrDBClosed
@ -2340,53 +2266,29 @@ func (s *SQLiteState) UpdateVolume(volume *Volume) error {
return define.ErrVolumeRemoved
}
return define.ErrNotImplemented
row := s.conn.QueryRow("SELECT JSON FROM VolumeState WHERE Name=?;", volume.Name())
// newState := new(VolumeState)
// volumeName := []byte(volume.Name())
var stateJSON string
if err := row.Scan(&stateJSON); err != nil {
if errors.Is(err, sql.ErrNoRows) {
volume.valid = false
return define.ErrNoSuchVolume
}
return fmt.Errorf("scanning volume %s state JSON: %w", volume.Name(), err)
}
// db, err := s.getDBCon()
// if err != nil {
// return err
// }
// defer s.deferredCloseDBCon(db)
newState := new(VolumeState)
if err := json.Unmarshal([]byte(stateJSON), newState); err != nil {
return fmt.Errorf("unmarshalling volume %s state: %w", volume.Name(), err)
}
// err = db.View(func(tx *bolt.Tx) error {
// volBucket, err := getVolBucket(tx)
// if err != nil {
// return err
// }
volume.state = newState
// volToUpdate := volBucket.Bucket(volumeName)
// if volToUpdate == nil {
// volume.valid = false
// return fmt.Errorf("no volume with name %s found in database: %w", volume.Name(), define.ErrNoSuchVolume)
// }
// stateBytes := volToUpdate.Get(stateKey)
// if stateBytes == nil {
// // Having no state is valid.
// // Return nil, use the empty state.
// return nil
// }
// if err := json.Unmarshal(stateBytes, newState); err != nil {
// return fmt.Errorf("unmarshalling volume %s state: %w", volume.Name(), err)
// }
// return nil
// })
// if err != nil {
// return err
// }
// volume.state = newState
// return nil
return nil
}
// SaveVolume saves the volume's state to the database.
func (s *SQLiteState) SaveVolume(volume *Volume) error {
func (s *SQLiteState) SaveVolume(volume *Volume) (defErr error) {
if !s.valid {
return define.ErrDBClosed
}
@ -2395,102 +2297,82 @@ func (s *SQLiteState) SaveVolume(volume *Volume) error {
return define.ErrVolumeRemoved
}
return define.ErrNotImplemented
stateJSON, err := json.Marshal(volume.state)
if err != nil {
return fmt.Errorf("marshalling volume %s state JSON: %w", volume.Name(), err)
}
// volumeName := []byte(volume.Name())
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning transaction to rewrite volume %s state: %w", volume.Name(), err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to rewrite volume %s state: %v", volume.Name(), err)
}
}
}()
// var newStateJSON []byte
// if volume.state != nil {
// stateJSON, err := json.Marshal(volume.state)
// if err != nil {
// return fmt.Errorf("marshalling volume %s state to JSON: %w", volume.Name(), err)
// }
// newStateJSON = stateJSON
// }
results, err := tx.Exec("UPDATE TABLE VolumeState SET JSON=? WHERE Name=?;", stateJSON, volume.Name())
if err != nil {
return fmt.Errorf("updating volume %s state in DB: %w", volume.Name(), err)
}
rows, err := results.RowsAffected()
if err != nil {
return fmt.Errorf("retrieving volume %s state rewrite rows affected: %w", volume.Name(), err)
}
if rows == 0 {
volume.valid = false
return define.ErrNoSuchVolume
}
// db, err := s.getDBCon()
// if err != nil {
// return err
// }
// defer s.deferredCloseDBCon(db)
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction to rewrite volume %s state: %w", volume.Name(), err)
}
// err = db.Update(func(tx *bolt.Tx) error {
// volBucket, err := getVolBucket(tx)
// if err != nil {
// return err
// }
// volToUpdate := volBucket.Bucket(volumeName)
// if volToUpdate == nil {
// volume.valid = false
// return fmt.Errorf("no volume with name %s found in database: %w", volume.Name(), define.ErrNoSuchVolume)
// }
// return volToUpdate.Put(stateKey, newStateJSON)
// })
// return err
return nil
}
// AllVolumes returns all volumes present in the state
// TODO TODO TODO
// AllVolumes returns all volumes present in the state.
func (s *SQLiteState) AllVolumes() ([]*Volume, error) {
if !s.valid {
return nil, define.ErrDBClosed
}
return nil, define.ErrNotImplemented
rows, err := s.conn.Query("SELECT JSON FROM VolumeConfig;")
if err != nil {
return nil, fmt.Errorf("querying database for all volumes: %w", err)
}
defer rows.Close()
// volumes := []*Volume{}
var volumes []*Volume
// db, err := s.getDBCon()
// if err != nil {
// return nil, err
// }
// defer s.deferredCloseDBCon(db)
for rows.Next() {
var configJSON string
if err := rows.Scan(&configJSON); err != nil {
return nil, fmt.Errorf("scanning volume config from database: %w", err)
}
vol := new(Volume)
vol.config = new(VolumeConfig)
vol.state = new(VolumeState)
vol.runtime = s.runtime
// err = db.View(func(tx *bolt.Tx) error {
// allVolsBucket, err := getAllVolsBucket(tx)
// if err != nil {
// return err
// }
if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil {
return nil, fmt.Errorf("unmarshalling volume config: %w", err)
}
// volBucket, err := getVolBucket(tx)
// if err != nil {
// return err
// }
// err = allVolsBucket.ForEach(func(id, name []byte) error {
// volExists := volBucket.Bucket(id)
// // This check can be removed if performance becomes an
// // issue, but much less helpful errors will be produced
// if volExists == nil {
// return fmt.Errorf("inconsistency in state - volume %s is in all volumes bucket but volume not found: %w", string(id), define.ErrInternal)
// }
if err := finalizeVolumeSqlite(vol); err != nil {
return nil, err
}
// volume := new(Volume)
// volume.config = new(VolumeConfig)
// volume.state = new(VolumeState)
volumes = append(volumes, vol)
}
// if err := s.getVolumeFromDB(id, volume, volBucket); err != nil {
// if !errors.Is(err, define.ErrNSMismatch) {
// logrus.Errorf("Retrieving volume %s from the database: %v", string(id), err)
// }
// } else {
// volumes = append(volumes, volume)
// }
// return nil
// })
// return err
// })
// if err != nil {
// return nil, err
// }
// return volumes, nil
return volumes, nil
}
// Volume retrieves a volume from full name
// TODO TODO TODO
// Volume retrieves a volume from full name.
func (s *SQLiteState) Volume(name string) (*Volume, error) {
if name == "" {
return nil, define.ErrEmptyID
@ -2500,37 +2382,33 @@ func (s *SQLiteState) Volume(name string) (*Volume, error) {
return nil, define.ErrDBClosed
}
return nil, define.ErrNotImplemented
row := s.conn.QueryRow("SELECT JSON FROM VolumeConfig WHERE Name=?;", name)
// volName := []byte(name)
var configJSON string
// volume := new(Volume)
// volume.config = new(VolumeConfig)
// volume.state = new(VolumeState)
if err := row.Scan(&configJSON); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, define.ErrNoSuchVolume
}
}
// db, err := s.getDBCon()
// if err != nil {
// return nil, err
// }
// defer s.deferredCloseDBCon(db)
vol := new(Volume)
vol.config = new(VolumeConfig)
vol.state = new(VolumeState)
vol.runtime = s.runtime
// err = db.View(func(tx *bolt.Tx) error {
// volBkt, err := getVolBucket(tx)
// if err != nil {
// return err
// }
if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil {
return nil, fmt.Errorf("unmarshalling volume %s config JSON: %w", name, err)
}
// return s.getVolumeFromDB(volName, volume, volBkt)
// })
// if err != nil {
// return nil, err
// }
if err := finalizeVolumeSqlite(vol); err != nil {
return nil, err
}
// return volume, nil
return vol, nil
}
// LookupVolume locates a volume from a partial name.
// TODO TODO TODO
// LookupVolume locates a volume from a unique partial name.
func (s *SQLiteState) LookupVolume(name string) (*Volume, error) {
if name == "" {
return nil, define.ErrEmptyID
@ -2540,68 +2418,45 @@ func (s *SQLiteState) LookupVolume(name string) (*Volume, error) {
return nil, define.ErrDBClosed
}
return nil, define.ErrNotImplemented
rows, err := s.conn.Query("SELECT JSON FROM VolumeConfig WHERE Name LIKE ?;", name)
if err != nil {
return nil, fmt.Errorf("querying database for volume %s: %w", name, err)
}
defer rows.Close()
// volName := []byte(name)
var configJSON string
foundResult := false
for rows.Next() {
if foundResult {
return nil, fmt.Errorf("more than one result for volume name %s: %w", name, define.ErrVolumeExists)
}
if err := rows.Scan(&configJSON); err != nil {
return nil, fmt.Errorf("retrieving volume %s config from database: %w", name, err)
}
foundResult = true
}
if !foundResult {
return nil, define.ErrNoSuchVolume
}
// volume := new(Volume)
// volume.config = new(VolumeConfig)
// volume.state = new(VolumeState)
vol := new(Volume)
vol.config = new(VolumeConfig)
vol.state = new(VolumeState)
vol.runtime = s.runtime
// db, err := s.getDBCon()
// if err != nil {
// return nil, err
// }
// defer s.deferredCloseDBCon(db)
if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil {
return nil, fmt.Errorf("unmarshalling volume %s config JSON: %w", name, err)
}
// err = db.View(func(tx *bolt.Tx) error {
// volBkt, err := getVolBucket(tx)
// if err != nil {
// return err
// }
if err := finalizeVolumeSqlite(vol); err != nil {
return nil, err
}
// allVolsBkt, err := getAllVolsBucket(tx)
// if err != nil {
// return err
// }
// // Check for exact match on name
// volDB := volBkt.Bucket(volName)
// if volDB != nil {
// return s.getVolumeFromDB(volName, volume, volBkt)
// }
// // No exact match. Search all names.
// foundMatch := false
// err = allVolsBkt.ForEach(func(checkName, checkName2 []byte) error {
// if strings.HasPrefix(string(checkName), name) {
// if foundMatch {
// return fmt.Errorf("more than one result for volume name %q: %w", name, define.ErrVolumeExists)
// }
// foundMatch = true
// volName = checkName
// }
// return nil
// })
// if err != nil {
// return err
// }
// if !foundMatch {
// return fmt.Errorf("no volume with name %q found: %w", name, define.ErrNoSuchVolume)
// }
// return s.getVolumeFromDB(volName, volume, volBkt)
// })
// if err != nil {
// return nil, err
// }
// return volume, nil
return vol, nil
}
// HasVolume returns true if the given volume exists in the state, otherwise it returns false
// TODO TODO TODO
// HasVolume returns true if the given volume exists in the state.
// Otherwise it returns false.
func (s *SQLiteState) HasVolume(name string) (bool, error) {
if name == "" {
return false, define.ErrEmptyID
@ -2611,42 +2466,25 @@ func (s *SQLiteState) HasVolume(name string) (bool, error) {
return false, define.ErrDBClosed
}
return false, define.ErrNotImplemented
row := s.conn.QueryRow("SELECT 1 FROM VolumeConfig WHERE Name=?;", name)
// volName := []byte(name)
var check int
if err := row.Scan(&check); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, fmt.Errorf("looking up volume %s in database: %w", name, err)
}
if check != 1 {
return false, fmt.Errorf("check digit for volume %s lookup incorrect: %w", name, define.ErrInternal)
}
// exists := false
// db, err := s.getDBCon()
// if err != nil {
// return false, err
// }
// defer s.deferredCloseDBCon(db)
// err = db.View(func(tx *bolt.Tx) error {
// volBkt, err := getVolBucket(tx)
// if err != nil {
// return err
// }
// volDB := volBkt.Bucket(volName)
// if volDB != nil {
// exists = true
// }
// return nil
// })
// if err != nil {
// return false, err
// }
// return exists, nil
return true, nil
}
// VolumeInUse checks if any container is using the volume
// VolumeInUse checks if any container is using the volume.
// It returns a slice of the IDs of the containers using the given
// volume. If the slice is empty, no containers use the given volume
// TODO TODO TODO
// volume. If the slice is empty, no containers use the given volume.
func (s *SQLiteState) VolumeInUse(volume *Volume) ([]string, error) {
if !s.valid {
return nil, define.ErrDBClosed
@ -2656,94 +2494,42 @@ func (s *SQLiteState) VolumeInUse(volume *Volume) ([]string, error) {
return nil, define.ErrVolumeRemoved
}
return nil, define.ErrNotImplemented
rows, err := s.conn.Query("SELECT ContainerID FROM ContainerVolume WHERE VolumeName=?;", volume.Name())
if err != nil {
return nil, fmt.Errorf("querying database for containers using volume %s: %w", volume.Name(), err)
}
defer rows.Close()
// depCtrs := []string{}
var ctrs []string
for rows.Next() {
var ctr string
if err := rows.Scan(&ctr); err != nil {
return nil, fmt.Errorf("scanning container ID for container using volume %s: %w", volume.Name(), err)
}
ctrs = append(ctrs, ctr)
}
// db, err := s.getDBCon()
// if err != nil {
// return nil, err
// }
// defer s.deferredCloseDBCon(db)
// err = db.View(func(tx *bolt.Tx) error {
// volBucket, err := getVolBucket(tx)
// if err != nil {
// return err
// }
// ctrBucket, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
// volDB := volBucket.Bucket([]byte(volume.Name()))
// if volDB == nil {
// volume.valid = false
// return fmt.Errorf("no volume with name %s found in DB: %w", volume.Name(), define.ErrNoSuchVolume)
// }
// dependsBkt := volDB.Bucket(volDependenciesBkt)
// if dependsBkt == nil {
// return fmt.Errorf("volume %s has no dependencies bucket: %w", volume.Name(), define.ErrInternal)
// }
// // Iterate through and add dependencies
// err = dependsBkt.ForEach(func(id, value []byte) error {
// // Look up all dependencies and see that they
// // still exist before appending.
// ctrExists := ctrBucket.Bucket(id)
// if ctrExists == nil {
// return nil
// }
// depCtrs = append(depCtrs, string(id))
// return nil
// })
// if err != nil {
// return err
// }
// return nil
// })
// if err != nil {
// return nil, err
// }
// return depCtrs, nil
return ctrs, nil
}
// ContainerIDIsVolume checks if the given c/storage container ID is used as
// backing storage for a volume.
// TODO TODO TODO
func (s *SQLiteState) ContainerIDIsVolume(id string) (bool, error) {
if !s.valid {
return false, define.ErrDBClosed
}
return false, define.ErrNotImplemented
row := s.conn.QueryRow("SELECT 1 FROM VolumeConfig WHERE StorageID=?;", id)
var checkDigit int
if err := row.Scan(&checkDigit); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, fmt.Errorf("error retrieving volumes using storage ID %s: %w", id, err)
}
if checkDigit != 1 {
return false, fmt.Errorf("check digit for volumes using storage ID %s was incorrect: %w", id, define.ErrInternal)
}
// isVol := false
// db, err := s.getDBCon()
// if err != nil {
// return false, err
// }
// defer s.deferredCloseDBCon(db)
// err = db.View(func(tx *bolt.Tx) error {
// volCtrsBkt, err := getVolumeContainersBucket(tx)
// if err != nil {
// return err
// }
// volName := volCtrsBkt.Get([]byte(id))
// if volName != nil {
// isVol = true
// }
// return nil
// })
// return isVol, err
return true, nil
}

View File

@ -279,6 +279,20 @@ func finalizePodSqlite(pod *Pod) error {
return nil
}
// Finalize a volume that was pulled out of the database
func finalizeVolumeSqlite(vol *Volume) error {
// Get the lock
lock, err := vol.runtime.lockManager.RetrieveLock(vol.config.LockID)
if err != nil {
return fmt.Errorf("retrieving lock for volume %s: %w", vol.Name(), err)
}
vol.lock = lock
vol.valid = true
return nil
}
func (s *SQLiteState) rewriteContainerConfig(ctr *Container, newCfg *ContainerConfig) (defErr error) {
json, err := json.Marshal(newCfg)
if err != nil {
@ -404,6 +418,9 @@ func (s *SQLiteState) removeContainer(ctr *Container) (defErr error) {
}
}()
// TODO TODO TODO:
// Need to verify that at least 1 row was deleted from ContainerConfig.
// Otherwise return ErrNoSuchCtr.
if _, err := tx.Exec("DELETE FROM IDNamespace WHERE ID=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s id from database: %w", ctr.ID(), err)
}