Some further work on SQLite state

- Added a mechanism to check schema version and migrate
  (no migrations yet since schema hasn't changed yet).
- Added pod support to AddContainer, and unified AddContainer and
  RemoveContainer between containers and pods.
- Fixed newly-added GetPodName and GetCtrName in BoltDB so they
  only return pod/container names.

Signed-off-by: Matt Heon <mheon@redhat.com>
This commit is contained in:
Matt Heon
2023-02-20 13:47:17 -05:00
parent 5c2a0670fc
commit b4c4f9c93d
3 changed files with 170 additions and 96 deletions

View File

@ -490,7 +490,6 @@ func (s *BoltState) ValidateDBConfig(runtime *Runtime) error {
// GetContainerName returns the name associated with a given ID.
// Returns ErrNoSuchCtr if the ID does not exist.
// TODO TODO TODO: Rewrite this to only retrieve containers.
func (s *BoltState) GetContainerName(id string) (string, error) {
if id == "" {
return "", define.ErrEmptyID
@ -516,11 +515,21 @@ func (s *BoltState) GetContainerName(id string) (string, error) {
return err
}
ctrsBkt, err := getCtrBucket(tx)
if err != nil {
return err
}
nameBytes := idBkt.Get(idBytes)
if nameBytes == nil {
return define.ErrNoSuchCtr
}
ctrExists := ctrsBkt.Bucket(idBytes)
if ctrExists == nil {
return define.ErrNoSuchCtr
}
name = string(nameBytes)
return nil
})
@ -533,7 +542,6 @@ func (s *BoltState) GetContainerName(id string) (string, error) {
// GetPodName returns the name associated with a given ID.
// Returns ErrNoSuchPor if the ID does not exist.
// TODO TODO TODO: Rewrite this to only retrieve pods
func (s *BoltState) GetPodName(id string) (string, error) {
if id == "" {
return "", define.ErrEmptyID
@ -559,11 +567,21 @@ func (s *BoltState) GetPodName(id string) (string, error) {
return err
}
podBkt, err := getPodBucket(tx)
if err != nil {
return err
}
nameBytes := idBkt.Get(idBytes)
if nameBytes == nil {
return define.ErrNoSuchPod
}
podExists := podBkt.Bucket(idBytes)
if podExists == nil {
return define.ErrNoSuchPod
}
name = string(nameBytes)
return nil
})

View File

@ -58,7 +58,9 @@ func NewSqliteState(runtime *Runtime) (_ State, defErr error) {
return nil, fmt.Errorf("setting full fsync mode in db: %w", err)
}
// TODO: Check schema version here and perform migration if necessary
if err := state.migrateSchemaIfNecessary(); err != nil {
return nil, err
}
// Set up tables
if err := sqliteInitTables(state.conn); err != nil {
@ -555,7 +557,7 @@ func (s *SQLiteState) HasContainer(id string) (bool, error) {
// AddContainer adds a container to the state
// The container being added cannot belong to a pod
func (s *SQLiteState) AddContainer(ctr *Container) (defErr error) {
func (s *SQLiteState) AddContainer(ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
}
@ -568,63 +570,13 @@ func (s *SQLiteState) AddContainer(ctr *Container) (defErr error) {
return fmt.Errorf("cannot add a container that belongs to a pod with AddContainer - use AddContainerToPod: %w", define.ErrInvalidArg)
}
configJSON, err := json.Marshal(ctr.config)
if err != nil {
return fmt.Errorf("marshalling container config json: %w", err)
}
stateJSON, err := json.Marshal(ctr.state)
if err != nil {
return fmt.Errorf("marshalling container state json: %w", err)
}
deps := ctr.Dependencies()
// TODO: Verify all dependencies are part of the same pod as this
// container
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container create transaction: %w", err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Error rolling back transaction to create container: %v", err)
}
}
}()
if _, err := tx.Exec("INSERT INTO IDNamespace VALUES (?);", ctr.ID()); err != nil {
return fmt.Errorf("adding container id to database: %w", err)
}
if _, err := tx.Exec("INSERT INTO ContainerConfig VALUES (?, ?, ?, ?);", ctr.ID(), ctr.Name(), sql.NullString{}, configJSON); err != nil {
return fmt.Errorf("adding container config to database: %w", err)
}
if _, err := tx.Exec("INSERT INTO ContainerState VALUES (?, ?, ?, ?);", ctr.ID(), int(ctr.state.State), ctr.state.ExitCode, stateJSON); err != nil {
return fmt.Errorf("adding container state to database: %w", err)
}
for _, dep := range deps {
if _, err := tx.Exec("INSERT INTO ContainerDependency VALUES (?, ?);", ctr.ID(), dep); err != nil {
return fmt.Errorf("adding container dependency %s to database: %w", dep, err)
}
}
for _, vol := range ctr.config.NamedVolumes {
if _, err := tx.Exec("INSERT INTO ContainerVolume VALUES (?, ?);", ctr.ID(), vol.Name); err != nil {
return fmt.Errorf("adding container volume %s to database: %w", vol.Name, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
return s.addContainer(ctr)
}
// RemoveContainer removes a container from the state
// Only removes containers not in pods - for containers that are a member of a
// pod, use RemoveContainerFromPod
func (s *SQLiteState) RemoveContainer(ctr *Container) (defErr error) {
func (s *SQLiteState) RemoveContainer(ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
}
@ -633,42 +585,7 @@ func (s *SQLiteState) RemoveContainer(ctr *Container) (defErr error) {
return fmt.Errorf("container %s is part of a pod, use RemoveContainerFromPod instead: %w", ctr.ID(), define.ErrPodExists)
}
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container %s removal transaction: %w", ctr.ID(), err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Error rolling back transaction to remove container %s: %v", ctr.ID(), err)
}
}
}()
if _, err := tx.Exec("DELETE FROM IDNamespace WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s id from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerConfig WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s config from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerState WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s state from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerDependency WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s dependencies from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerVolume WHERE ContainerID=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s volumes from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerExecSession WHERE ContainerID=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s exec sessions from database: %w", ctr.ID(), err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing container %s removal transaction: %w", ctr.ID(), err)
}
return nil
return s.removeContainer(ctr)
}
// UpdateContainer updates a container's state from the database
@ -2263,7 +2180,6 @@ func (s *SQLiteState) RemovePodContainers(pod *Pod) error {
// AddContainerToPod adds the given container to an existing pod
// The container will be added to the state and the pod
// TODO TODO TODO
func (s *SQLiteState) AddContainerToPod(pod *Pod, ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
@ -2281,12 +2197,11 @@ func (s *SQLiteState) AddContainerToPod(pod *Pod, ctr *Container) error {
return fmt.Errorf("container %s is not part of pod %s: %w", ctr.ID(), pod.ID(), define.ErrNoSuchCtr)
}
return define.ErrNotImplemented
return s.addContainer(ctr)
}
// RemoveContainerFromPod removes a container from an existing pod
// The container will also be removed from the state
// TODO TODO TODO
func (s *SQLiteState) RemoveContainerFromPod(pod *Pod, ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
@ -2304,7 +2219,7 @@ func (s *SQLiteState) RemoveContainerFromPod(pod *Pod, ctr *Container) error {
return fmt.Errorf("container %s is not part of pod %s: %w", ctr.ID(), pod.ID(), define.ErrInvalidArg)
}
return define.ErrNotImplemented
return s.removeContainer(ctr)
}
// UpdatePod updates a pod's state from the database.

View File

@ -13,6 +13,35 @@ import (
_ "github.com/mattn/go-sqlite3"
)
func (s *SQLiteState) migrateSchemaIfNecessary() (defErr error) {
row := s.conn.QueryRow("SELECT SchemaVersion FROM DBConfig;")
var schemaVer int
if err := row.Scan(&schemaVer); err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Brand-new, unpopulated DB.
// Schema was just created, so it has to be the latest.
return nil
}
}
// If the schema version 0 or less, it's invalid
if schemaVer <= 0 {
return fmt.Errorf("database schema version %d is invalid: %w", schemaVer, define.ErrInternal)
}
if schemaVer != schemaVersion {
// If the DB is a later schema than we support, we have to error
if schemaVer > schemaVersion {
return fmt.Errorf("database has schema version %d while this libpod version only supports version %d: %w",
schemaVer, schemaVersion, define.ErrInternal)
}
// Perform schema migration here, one version at a time.
}
return nil
}
// Initialize all required tables for the SQLite state
func sqliteInitTables(conn *sql.DB) (defErr error) {
// Technically we could split the "CREATE TABLE IF NOT EXISTS" and ");"
@ -286,3 +315,115 @@ func (s *SQLiteState) rewriteContainerConfig(ctr *Container, newCfg *ContainerCo
return nil
}
func (s *SQLiteState) addContainer(ctr *Container) (defErr error) {
configJSON, err := json.Marshal(ctr.config)
if err != nil {
return fmt.Errorf("marshalling container config json: %w", err)
}
stateJSON, err := json.Marshal(ctr.state)
if err != nil {
return fmt.Errorf("marshalling container state json: %w", err)
}
deps := ctr.Dependencies()
pod := sql.NullString{}
if ctr.config.Pod != "" {
pod.Valid = true
pod.String = ctr.config.Pod
}
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container create transaction: %w", err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Error rolling back transaction to create container: %v", err)
}
}
}()
if _, err := tx.Exec("INSERT INTO IDNamespace VALUES (?);", ctr.ID()); err != nil {
return fmt.Errorf("adding container id to database: %w", err)
}
if _, err := tx.Exec("INSERT INTO ContainerConfig VALUES (?, ?, ?, ?);", ctr.ID(), ctr.Name(), pod, configJSON); err != nil {
return fmt.Errorf("adding container config to database: %w", err)
}
if _, err := tx.Exec("INSERT INTO ContainerState VALUES (?, ?, ?, ?);", ctr.ID(), int(ctr.state.State), ctr.state.ExitCode, stateJSON); err != nil {
return fmt.Errorf("adding container state to database: %w", err)
}
for _, dep := range deps {
// Check if the dependency is in the same pod
var depPod sql.NullString
row := tx.QueryRow("SELECT PodID FROM ContainerConfig WHERE Id=?;", dep)
if err := row.Scan(&depPod); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("container dependency %s does not exist in database: %w", dep, define.ErrNoSuchCtr)
}
}
if ctr.config.Pod == "" && depPod.Valid {
return fmt.Errorf("container dependency %s is part of a pod, but container is not: %w", dep, define.ErrInvalidArg)
} else if ctr.config.Pod != "" && !depPod.Valid {
return fmt.Errorf("container dependency %s is not part of a pod, but this container belongs to pod %s", dep, ctr.config.Pod, define.ErrInvalidArg)
} else if ctr.config.Pod != "" && depPod.String != ctr.config.Pod {
return fmt.Errorf("container dependency %s is part of pod %s but container is part of pod %s, pods must match: %w", dep, depPod.String, ctr.config.Pod, define.ErrInvalidArg)
}
if _, err := tx.Exec("INSERT INTO ContainerDependency VALUES (?, ?);", ctr.ID(), dep); err != nil {
return fmt.Errorf("adding container dependency %s to database: %w", dep, err)
}
}
for _, vol := range ctr.config.NamedVolumes {
if _, err := tx.Exec("INSERT INTO ContainerVolume VALUES (?, ?);", ctr.ID(), vol.Name); err != nil {
return fmt.Errorf("adding container volume %s to database: %w", vol.Name, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
}
func (s *SQLiteState) removeContainer(ctr *Container) (defErr error) {
tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container %s removal transaction: %w", ctr.ID(), err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Error rolling back transaction to remove container %s: %v", ctr.ID(), err)
}
}
}()
if _, err := tx.Exec("DELETE FROM IDNamespace WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s id from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerConfig WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s config from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerState WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s state from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerDependency WHERE Id=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s dependencies from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerVolume WHERE ContainerID=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s volumes from database: %w", ctr.ID(), err)
}
if _, err := tx.Exec("DELETE FROM ContainerExecSession WHERE ContainerID=?;", ctr.ID()); err != nil {
return fmt.Errorf("removing container %s exec sessions from database: %w", ctr.ID(), err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("committing container %s removal transaction: %w", ctr.ID(), err)
}
return nil
}