package libpod import ( "database/sql" "errors" "fmt" "os" "path/filepath" goruntime "runtime" "strings" "time" "github.com/containers/common/libnetwork/types" "github.com/containers/podman/v4/libpod/define" "github.com/containers/podman/v4/pkg/rootless" "github.com/containers/storage" "github.com/sirupsen/logrus" // SQLite backend for database/sql _ "github.com/mattn/go-sqlite3" ) const schemaVersion = 1 // SQLiteState is a state implementation backed by a SQLite database type SQLiteState struct { valid bool conn *sql.DB runtime *Runtime } const ( // Deal with timezone automatically. sqliteOptionLocation = "_loc=auto" // Force an fsync after each transaction (https://www.sqlite.org/pragma.html#pragma_synchronous). sqliteOptionSynchronous = "&_sync=FULL" // Allow foreign keys (https://www.sqlite.org/pragma.html#pragma_foreign_keys). sqliteOptionForeignKeys = "&_foreign_keys=1" // Make sure that transactions happen exclusively. sqliteOptionTXLock = "&_txlock=exclusive" // Assembled sqlite options used when opening the database. sqliteOptions = "db.sql?" + sqliteOptionLocation + sqliteOptionSynchronous + sqliteOptionForeignKeys + sqliteOptionTXLock ) // NewSqliteState creates a new SQLite-backed state database. func NewSqliteState(runtime *Runtime) (_ State, defErr error) { state := new(SQLiteState) basePath := runtime.storageConfig.GraphRoot if runtime.storageConfig.TransientStore { basePath = runtime.storageConfig.RunRoot } // c/storage is set up *after* the DB - so even though we use the c/s // root (or, for transient, runroot) dir, we need to make the dir // ourselves. if err := os.MkdirAll(basePath, 0700); err != nil { return nil, fmt.Errorf("creating root directory: %w", err) } conn, err := sql.Open("sqlite3", filepath.Join(basePath, sqliteOptions)) if err != nil { return nil, fmt.Errorf("initializing sqlite database: %w", err) } defer func() { if defErr != nil { if err := conn.Close(); err != nil { logrus.Errorf("Error closing SQLite DB connection: %v", err) } } }() if err := initSQLiteDB(conn); err != nil { return nil, err } state.conn = conn state.valid = true state.runtime = runtime return state, nil } // Close closes the state and prevents further use func (s *SQLiteState) Close() error { if err := s.conn.Close(); err != nil { return err } s.valid = false return nil } // Refresh clears container and pod states after a reboot func (s *SQLiteState) Refresh() (defErr error) { if !s.valid { return define.ErrDBClosed } // Retrieve all containers, pods, and volumes. // Maps are indexed by ID (or volume name) so we know which goes where, // and store the marshalled state JSON ctrStates := make(map[string]string) podStates := make(map[string]string) volumeStates := make(map[string]string) ctrRows, err := s.conn.Query("SELECT ID, JSON FROM ContainerState;") if err != nil { return fmt.Errorf("querying for container states: %w", err) } defer ctrRows.Close() for ctrRows.Next() { var ( id, stateJSON string ) if err := ctrRows.Scan(&id, &stateJSON); err != nil { return fmt.Errorf("scanning container state row: %w", err) } ctrState := new(ContainerState) if err := json.Unmarshal([]byte(stateJSON), ctrState); err != nil { return fmt.Errorf("unmarshalling container state json: %w", err) } // Refresh the state resetContainerState(ctrState) newJSON, err := json.Marshal(ctrState) if err != nil { return fmt.Errorf("marshalling container state json: %w", err) } ctrStates[id] = string(newJSON) } podRows, err := s.conn.Query("SELECT ID, JSON FROM PodState;") if err != nil { return fmt.Errorf("querying for pod states: %w", err) } defer podRows.Close() for podRows.Next() { var ( id, stateJSON string ) if err := podRows.Scan(&id, &stateJSON); err != nil { return fmt.Errorf("scanning pod state row: %w", err) } podState := new(podState) if err := json.Unmarshal([]byte(stateJSON), podState); err != nil { return fmt.Errorf("unmarshalling pod state json: %w", err) } // Refresh the state resetPodState(podState) newJSON, err := json.Marshal(podState) if err != nil { return fmt.Errorf("marshalling pod state json: %w", err) } podStates[id] = string(newJSON) } volRows, err := s.conn.Query("SELECT Name, JSON FROM VolumeState;") if err != nil { return fmt.Errorf("querying for volume states: %w", err) } defer volRows.Close() for volRows.Next() { var ( name, stateJSON string ) if err := volRows.Scan(&name, &stateJSON); err != nil { return fmt.Errorf("scanning volume state row: %w", err) } volState := new(VolumeState) if err := json.Unmarshal([]byte(stateJSON), volState); err != nil { return fmt.Errorf("unmarshalling volume state json: %w", err) } // Refresh the state resetVolumeState(volState) newJSON, err := json.Marshal(volState) if err != nil { return fmt.Errorf("marshalling volume state json: %w", err) } volumeStates[name] = string(newJSON) } // Write updated states back to DB, and perform additional maintenance // (Remove exit codes and exec sessions) tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning refresh transaction: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to refresh database state: %v", err) } } }() for id, json := range ctrStates { if _, err := tx.Exec("UPDATE ContainerState SET JSON=? WHERE ID=?;", json, id); err != nil { return fmt.Errorf("updating container state: %w", err) } } for id, json := range podStates { if _, err := tx.Exec("UPDATE PodState SET JSON=? WHERE ID=?;", json, id); err != nil { return fmt.Errorf("updating pod state: %w", err) } } for name, json := range volumeStates { if _, err := tx.Exec("UPDATE VolumeState SET JSON=? WHERE Name=?;", json, name); err != nil { return fmt.Errorf("updating volume state: %w", err) } } if _, err := tx.Exec("DELETE FROM ContainerExitCode;"); err != nil { return fmt.Errorf("removing container exit codes: %w", err) } if _, err := tx.Exec("DELETE FROM ContainerExecSession;"); err != nil { return fmt.Errorf("removing container exec sessions: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction: %w", err) } return nil } // GetDBConfig retrieves runtime configuration fields that were created when // the database was first initialized func (s *SQLiteState) GetDBConfig() (*DBConfig, error) { if !s.valid { return nil, define.ErrDBClosed } cfg := new(DBConfig) var staticDir, tmpDir, graphRoot, runRoot, graphDriver, volumeDir string row := s.conn.QueryRow("SELECT StaticDir, TmpDir, GraphRoot, RunRoot, GraphDriver, VolumeDir FROM DBConfig;") if err := row.Scan(&staticDir, &tmpDir, &graphRoot, &runRoot, &graphDriver, &volumeDir); err != nil { if errors.Is(err, sql.ErrNoRows) { return cfg, nil } return nil, fmt.Errorf("retrieving DB config: %w", err) } cfg.LibpodRoot = staticDir cfg.LibpodTmp = tmpDir cfg.StorageRoot = graphRoot cfg.StorageTmp = runRoot cfg.GraphDriver = graphDriver cfg.VolumePath = volumeDir return cfg, nil } // ValidateDBConfig validates paths in the given runtime against the database func (s *SQLiteState) ValidateDBConfig(runtime *Runtime) (defErr error) { if !s.valid { return define.ErrDBClosed } storeOpts, err := storage.DefaultStoreOptions(rootless.IsRootless(), rootless.GetRootlessUID()) if err != nil { return err } const createRow = ` INSERT INTO DBconfig VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? );` var ( os, staticDir, tmpDir, graphRoot, runRoot, graphDriver, volumePath string runtimeOS = goruntime.GOOS runtimeStaticDir = filepath.Clean(s.runtime.config.Engine.StaticDir) runtimeTmpDir = filepath.Clean(s.runtime.config.Engine.TmpDir) runtimeGraphRoot = filepath.Clean(s.runtime.StorageConfig().GraphRoot) runtimeRunRoot = filepath.Clean(s.runtime.StorageConfig().RunRoot) runtimeGraphDriver = s.runtime.StorageConfig().GraphDriverName runtimeVolumePath = filepath.Clean(s.runtime.config.Engine.VolumePath) ) // Some fields may be empty, indicating they are set to the default. // If so, grab the default from c/storage for them. if runtimeGraphRoot == "" { runtimeGraphRoot = storeOpts.GraphRoot } if runtimeRunRoot == "" { runtimeRunRoot = storeOpts.RunRoot } if runtimeGraphDriver == "" { runtimeGraphDriver = storeOpts.GraphDriverName } // We have to do this in a transaction to ensure mutual exclusion. // Otherwise we have a race - multiple processes can be checking the // row's existence simultaneously, both try to create it, second one to // get the transaction lock gets an error. // TODO: The transaction isn't strictly necessary, and there's a (small) // chance it's a perf hit. If it is, we can move it entirely within the // `errors.Is()` block below, with extra validation to ensure the row // still does not exist (and, if it does, to retry this function). tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning database validation transaction: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to validate database: %v", err) } } }() row := tx.QueryRow("SELECT Os, StaticDir, TmpDir, GraphRoot, RunRoot, GraphDriver, VolumeDir FROM DBConfig;") if err := row.Scan(&os, &staticDir, &tmpDir, &graphRoot, &runRoot, &graphDriver, &volumePath); err != nil { if errors.Is(err, sql.ErrNoRows) { if _, err := tx.Exec(createRow, 1, schemaVersion, runtimeOS, runtimeStaticDir, runtimeTmpDir, runtimeGraphRoot, runtimeRunRoot, runtimeGraphDriver, runtimeVolumePath); err != nil { return fmt.Errorf("adding DB config row: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing write of database validation row: %w", err) } return nil } return fmt.Errorf("retrieving DB config: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing database validation row: %w", err) } checkField := func(fieldName, dbVal, ourVal string) error { if dbVal != ourVal { return fmt.Errorf("database %s %q does not match our %s %q: %w", fieldName, dbVal, fieldName, ourVal, define.ErrDBBadConfig) } return nil } if err := checkField("os", os, runtimeOS); err != nil { return err } if err := checkField("static dir", staticDir, runtimeStaticDir); err != nil { return err } if err := checkField("tmp dir", tmpDir, runtimeTmpDir); err != nil { return err } if err := checkField("graph root", graphRoot, runtimeGraphRoot); err != nil { return err } if err := checkField("run root", runRoot, runtimeRunRoot); err != nil { return err } if err := checkField("graph driver", graphDriver, runtimeGraphDriver); err != nil { return err } if err := checkField("volume path", volumePath, runtimeVolumePath); err != nil { return err } return nil } // GetContainerName returns the name of the container associated with a given // ID. Returns ErrNoSuchCtr if the ID does not exist. func (s *SQLiteState) GetContainerName(id string) (string, error) { if id == "" { return "", define.ErrEmptyID } if !s.valid { return "", define.ErrDBClosed } var name string row := s.conn.QueryRow("SELECT Name FROM ContainerConfig WHERE ID=?;", id) if err := row.Scan(&name); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", define.ErrNoSuchCtr } return "", fmt.Errorf("looking up container %s name: %w", id, err) } return name, nil } // GetPodName returns the name of the pod associated with a given ID. // Returns ErrNoSuchPod if the ID does not exist. func (s *SQLiteState) GetPodName(id string) (string, error) { if id == "" { return "", define.ErrEmptyID } if !s.valid { return "", define.ErrDBClosed } var name string row := s.conn.QueryRow("SELECT Name FROM PodConfig WHERE ID=?;", id) if err := row.Scan(&name); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", define.ErrNoSuchPod } return "", fmt.Errorf("looking up pod %s name: %w", id, err) } return name, nil } // Container retrieves a single container from the state by its full ID func (s *SQLiteState) Container(id string) (*Container, error) { if id == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } ctrConfig, err := s.getCtrConfig(id) if err != nil { return nil, err } ctr := new(Container) ctr.config = ctrConfig ctr.state = new(ContainerState) ctr.runtime = s.runtime if err := finalizeCtrSqlite(ctr); err != nil { return nil, err } return ctr, nil } // LookupContainerID retrieves a container ID from the state by full or unique // partial ID or name func (s *SQLiteState) LookupContainerID(idOrName string) (string, error) { if idOrName == "" { return "", define.ErrEmptyID } if !s.valid { return "", define.ErrDBClosed } rows, err := s.conn.Query("SELECT ID, Name FROM ContainerConfig WHERE ContainerConfig.Name=? OR (ContainerConfig.ID LIKE ?);", idOrName, idOrName+"%") if err != nil { return "", fmt.Errorf("looking up container %q in database: %w", idOrName, err) } defer rows.Close() var ( id, name string resCount uint ) for rows.Next() { if err := rows.Scan(&id, &name); err != nil { return "", fmt.Errorf("retrieving container %q ID from database: %w", idOrName, err) } if name == idOrName { return id, nil } resCount++ } if resCount == 0 { return "", define.ErrNoSuchCtr } else if resCount > 1 { return "", fmt.Errorf("more than one result for container %q: %w", idOrName, define.ErrCtrExists) } return id, nil } // LookupContainer retrieves a container from the state by full or unique // partial ID or name func (s *SQLiteState) LookupContainer(idOrName string) (*Container, error) { if idOrName == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } rows, err := s.conn.Query("SELECT JSON, Name FROM ContainerConfig WHERE ContainerConfig.Name=? OR (ContainerConfig.ID LIKE ?);", idOrName, idOrName+"%") if err != nil { return nil, fmt.Errorf("looking up container %q in database: %w", idOrName, err) } defer rows.Close() var ( rawJSON, name string exactName bool resCount uint ) for rows.Next() { if err := rows.Scan(&rawJSON, &name); err != nil { return nil, fmt.Errorf("retrieving container %q ID from database: %w", idOrName, err) } if name == idOrName { exactName = true break } resCount++ } if !exactName { if resCount == 0 { return nil, fmt.Errorf("no container with name or ID %q found: %w", idOrName, define.ErrNoSuchCtr) } else if resCount > 1 { return nil, fmt.Errorf("more than one result for container %q: %w", idOrName, define.ErrCtrExists) } } ctr := new(Container) ctr.config = new(ContainerConfig) ctr.state = new(ContainerState) ctr.runtime = s.runtime if err := json.Unmarshal([]byte(rawJSON), ctr.config); err != nil { return nil, fmt.Errorf("unmarshalling container config JSON: %w", err) } if err := finalizeCtrSqlite(ctr); err != nil { return nil, err } return ctr, nil } // HasContainer checks if a container is present in the state func (s *SQLiteState) HasContainer(id string) (bool, error) { if id == "" { return false, define.ErrEmptyID } if !s.valid { return false, define.ErrDBClosed } row := s.conn.QueryRow("SELECT 1 FROM ContainerConfig WHERE ID=?;", id) var check int if err := row.Scan(&check); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, fmt.Errorf("looking up container %s in database: %w", id, err) } else if check != 1 { return false, fmt.Errorf("check digit for container %s lookup incorrect: %w", id, define.ErrInternal) } return true, nil } // AddContainer adds a container to the state // The container being added cannot belong to a pod func (s *SQLiteState) AddContainer(ctr *Container) error { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } if ctr.config.Pod != "" { return fmt.Errorf("cannot add a container that belongs to a pod with AddContainer - use AddContainerToPod: %w", define.ErrInvalidArg) } return s.addContainer(ctr) } // RemoveContainer removes a container from the state // Only removes containers not in pods - for containers that are a member of a // pod, use RemoveContainerFromPod func (s *SQLiteState) RemoveContainer(ctr *Container) error { if !s.valid { return define.ErrDBClosed } if ctr.config.Pod != "" { return fmt.Errorf("container %s is part of a pod, use RemoveContainerFromPod instead: %w", ctr.ID(), define.ErrPodExists) } return s.removeContainer(ctr) } // UpdateContainer updates a container's state from the database func (s *SQLiteState) UpdateContainer(ctr *Container) error { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } row := s.conn.QueryRow("SELECT JSON FROM ContainerState WHERE ID=?;", ctr.ID()) var rawJSON string if err := row.Scan(&rawJSON); err != nil { if errors.Is(err, sql.ErrNoRows) { // Container was removed ctr.valid = false return fmt.Errorf("no container with ID %s found in database: %w", ctr.ID(), define.ErrNoSuchCtr) } } newState := new(ContainerState) if err := json.Unmarshal([]byte(rawJSON), newState); err != nil { return fmt.Errorf("unmarshalling container %s state JSON: %w", ctr.ID(), err) } ctr.state = newState return nil } // SaveContainer saves a container's current state in the database func (s *SQLiteState) SaveContainer(ctr *Container) (defErr error) { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } stateJSON, err := json.Marshal(ctr.state) if err != nil { return fmt.Errorf("marshalling container %s state JSON: %w", ctr.ID(), err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning container %s save transaction: %w", ctr.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to save container %s state: %v", ctr.ID(), err) } } }() result, err := tx.Exec("UPDATE ContainerState SET JSON=? WHERE ID=?;", stateJSON, ctr.ID()) if err != nil { return fmt.Errorf("writing container %s state: %w", ctr.ID(), err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("retrieving container %s save rows affected: %w", ctr.ID(), err) } if rows == 0 { ctr.valid = false return define.ErrNoSuchCtr } if err := tx.Commit(); err != nil { return fmt.Errorf("committing container %s state: %w", ctr.ID(), err) } return nil } // ContainerInUse checks if other containers depend on the given container // It returns a slice of the IDs of the containers depending on the given // container. If the slice is empty, no containers depend on the given container func (s *SQLiteState) ContainerInUse(ctr *Container) ([]string, error) { if !s.valid { return nil, define.ErrDBClosed } if !ctr.valid { return nil, define.ErrCtrRemoved } rows, err := s.conn.Query("SELECT ID FROM ContainerDependency WHERE DependencyID=?;", ctr.ID()) if err != nil { return nil, fmt.Errorf("retrieving containers that depend on container %s: %w", ctr.ID(), err) } defer rows.Close() deps := []string{} for rows.Next() { var dep string if err := rows.Scan(&dep); err != nil { return nil, fmt.Errorf("reading containers that depend on %s: %w", ctr.ID(), err) } deps = append(deps, dep) } return deps, nil } // AllContainers retrieves all the containers in the database // If `loadState` is set, the containers' state will be loaded as well. func (s *SQLiteState) AllContainers(loadState bool) ([]*Container, error) { if !s.valid { return nil, define.ErrDBClosed } ctrs := []*Container{} if loadState { rows, err := s.conn.Query("SELECT ContainerConfig.JSON, ContainerState.JSON AS StateJSON FROM ContainerConfig INNER JOIN ContainerState ON ContainerConfig.ID = ContainerState.ID;") if err != nil { return nil, fmt.Errorf("retrieving all containers from database: %w", err) } defer rows.Close() for rows.Next() { var configJSON, stateJSON string if err := rows.Scan(&configJSON, &stateJSON); err != nil { return nil, fmt.Errorf("scanning container from database: %w", err) } ctr := new(Container) ctr.config = new(ContainerConfig) ctr.state = new(ContainerState) ctr.runtime = s.runtime if err := json.Unmarshal([]byte(configJSON), ctr.config); err != nil { return nil, fmt.Errorf("unmarshalling container config: %w", err) } if err := json.Unmarshal([]byte(stateJSON), ctr.state); err != nil { return nil, fmt.Errorf("unmarshalling container %s state: %w", ctr.ID(), err) } ctrs = append(ctrs, ctr) } } else { rows, err := s.conn.Query("SELECT JSON FROM ContainerConfig;") if err != nil { return nil, fmt.Errorf("retrieving all containers from database: %w", err) } defer rows.Close() for rows.Next() { var rawJSON string if err := rows.Scan(&rawJSON); err != nil { return nil, fmt.Errorf("scanning container from database: %w", err) } ctr := new(Container) ctr.config = new(ContainerConfig) ctr.state = new(ContainerState) ctr.runtime = s.runtime if err := json.Unmarshal([]byte(rawJSON), ctr.config); err != nil { return nil, fmt.Errorf("unmarshalling container config: %w", err) } ctrs = append(ctrs, ctr) } } for _, ctr := range ctrs { if err := finalizeCtrSqlite(ctr); err != nil { return nil, err } } return ctrs, nil } // GetNetworks returns the networks this container is a part of. func (s *SQLiteState) GetNetworks(ctr *Container) (map[string]types.PerNetworkOptions, error) { if !s.valid { return nil, define.ErrDBClosed } if !ctr.valid { return nil, define.ErrCtrRemoved } // if the network mode is not bridge return no networks if !ctr.config.NetMode.IsBridge() { return nil, nil } cfg, err := s.getCtrConfig(ctr.ID()) if err != nil { if errors.Is(err, define.ErrNoSuchCtr) { ctr.valid = false } return nil, err } return cfg.Networks, nil } // NetworkConnect adds the given container to the given network. If aliases are // specified, those will be added to the given network. func (s *SQLiteState) NetworkConnect(ctr *Container, network string, opts types.PerNetworkOptions) error { return s.networkModify(ctr, network, opts, true, false) } // NetworkModify will allow you to set new options on an existing connected network func (s *SQLiteState) NetworkModify(ctr *Container, network string, opts types.PerNetworkOptions) error { return s.networkModify(ctr, network, opts, false, false) } // NetworkDisconnect disconnects the container from the given network, also // removing any aliases in the network. func (s *SQLiteState) NetworkDisconnect(ctr *Container, network string) error { return s.networkModify(ctr, network, types.PerNetworkOptions{}, false, true) } // GetContainerConfig returns a container config from the database by full ID func (s *SQLiteState) GetContainerConfig(id string) (*ContainerConfig, error) { if len(id) == 0 { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } return s.getCtrConfig(id) } // AddContainerExitCode adds the exit code for the specified container to the database. func (s *SQLiteState) AddContainerExitCode(id string, exitCode int32) (defErr error) { if len(id) == 0 { return define.ErrEmptyID } if !s.valid { return define.ErrDBClosed } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning transaction to add exit code: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to add exit code: %v", err) } } }() if _, err := tx.Exec("INSERT OR REPLACE INTO ContainerExitCode VALUES (?, ?, ?);", id, time.Now().Unix(), exitCode); err != nil { return fmt.Errorf("adding container %s exit code %d: %w", id, exitCode, err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to add exit code: %w", err) } return nil } // GetContainerExitCode returns the exit code for the specified container. func (s *SQLiteState) GetContainerExitCode(id string) (int32, error) { if len(id) == 0 { return -1, define.ErrEmptyID } if !s.valid { return -1, define.ErrDBClosed } row := s.conn.QueryRow("SELECT ExitCode FROM ContainerExitCode WHERE ID=?;", id) var exitCode int32 if err := row.Scan(&exitCode); err != nil { if errors.Is(err, sql.ErrNoRows) { return -1, fmt.Errorf("getting exit code of container %s from DB: %w", id, define.ErrNoSuchExitCode) } return -1, fmt.Errorf("scanning exit code of container %s: %w", id, err) } return exitCode, nil } // GetContainerExitCodeTimeStamp returns the time stamp when the exit code of // the specified container was added to the database. func (s *SQLiteState) GetContainerExitCodeTimeStamp(id string) (*time.Time, error) { if len(id) == 0 { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } row := s.conn.QueryRow("SELECT Timestamp FROM ContainerExitCode WHERE ID=?;", id) var timestamp int64 if err := row.Scan(×tamp); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("getting timestamp for exit code of container %s from DB: %w", id, define.ErrNoSuchExitCode) } return nil, fmt.Errorf("scanning exit timestamp of container %s: %w", id, err) } result := time.Unix(timestamp, 0) return &result, nil } // PruneExitCodes removes exit codes older than 5 minutes unless the associated // container still exists. func (s *SQLiteState) PruneContainerExitCodes() (defErr error) { if !s.valid { return define.ErrDBClosed } fiveMinsAgo := time.Now().Add(-5 * time.Minute).Unix() tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning transaction to remove old timestamps: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove old timestamps: %v", err) } } }() if _, err := tx.Exec("DELETE FROM ContainerExitCode WHERE (Timestamp <= ?) AND (ID NOT IN (SELECT ID FROM ContainerConfig))", fiveMinsAgo); err != nil { return fmt.Errorf("removing exit codes with timestamps older than 5 minutes: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to remove old timestamps: %w", err) } return nil } // AddExecSession adds an exec session to the state. func (s *SQLiteState) AddExecSession(ctr *Container, session *ExecSession) (defErr error) { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning container %s exec session %s add transaction: %w", ctr.ID(), session.Id, err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to add container %s exec session %s: %v", ctr.ID(), session.Id, err) } } }() if _, err := tx.Exec("INSERT INTO ContainerExecSession VALUES (?, ?);", session.Id, ctr.ID()); err != nil { return fmt.Errorf("adding container %s exec session %s to database: %w", ctr.ID(), session.Id, err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing container %s exec session %s addition: %w", ctr.ID(), session.Id, err) } return nil } // GetExecSession returns the ID of the container an exec session is associated // with. func (s *SQLiteState) GetExecSession(id string) (string, error) { if !s.valid { return "", define.ErrDBClosed } if id == "" { return "", define.ErrEmptyID } row := s.conn.QueryRow("SELECT ContainerID FROM ContainerExecSession WHERE ID=?;", id) var ctrID string if err := row.Scan(&ctrID); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", fmt.Errorf("no exec session with ID %s found: %w", id, define.ErrNoSuchExecSession) } return "", fmt.Errorf("retrieving exec session %s from database: %w", id, err) } return ctrID, nil } // RemoveExecSession removes references to the given exec session in the // database. func (s *SQLiteState) RemoveExecSession(session *ExecSession) (defErr error) { if !s.valid { return define.ErrDBClosed } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning container %s exec session %s remove transaction: %w", session.ContainerId, session.Id, err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove container %s exec session %s: %v", session.ContainerId, session.Id, err) } } }() result, err := tx.Exec("DELETE FROM ContainerExecSession WHERE ID=?;", session.Id) if err != nil { return fmt.Errorf("removing container %s exec session %s from database: %w", session.ContainerId, session.Id, err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("retrieving container %s exec session %s removal rows modified: %w", session.ContainerId, session.Id, err) } if rows == 0 { return define.ErrNoSuchExecSession } if err := tx.Commit(); err != nil { return fmt.Errorf("committing container %s exec session %s removal: %w", session.ContainerId, session.Id, err) } return nil } // GetContainerExecSessions retrieves the IDs of all exec sessions running in a // container that the database is aware of (IE, were added via AddExecSession). func (s *SQLiteState) GetContainerExecSessions(ctr *Container) ([]string, error) { if !s.valid { return nil, define.ErrDBClosed } if !ctr.valid { return nil, define.ErrCtrRemoved } rows, err := s.conn.Query("SELECT ID FROM ContainerExecSession WHERE ContainerID=?;", ctr.ID()) if err != nil { return nil, fmt.Errorf("querying container %s exec sessions: %w", ctr.ID(), err) } defer rows.Close() var sessions []string for rows.Next() { var session string if err := rows.Scan(&session); err != nil { return nil, fmt.Errorf("scanning container %s exec sessions row: %w", ctr.ID(), err) } sessions = append(sessions, session) } return sessions, nil } // RemoveContainerExecSessions removes all exec sessions attached to a given // container. func (s *SQLiteState) RemoveContainerExecSessions(ctr *Container) (defErr error) { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning container %s exec session removal transaction: %w", ctr.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove container %s exec sessions: %v", ctr.ID(), err) } } }() if _, err := tx.Exec("DELETE FROM ContainerExecSession WHERE ContainerID=?;", ctr.ID()); err != nil { return fmt.Errorf("removing container %s exec sessions from database: %w", ctr.ID(), err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing container %s exec session removal: %w", ctr.ID(), err) } return nil } // RewriteContainerConfig rewrites a container's configuration. // DO NOT USE TO: Change container dependencies, change pod membership, change // container ID. // WARNING: This function is DANGEROUS. Do not use without reading the full // comment on this function in state.go. // TODO: Once BoltDB is removed, this can be combined with SafeRewriteContainerConfig. func (s *SQLiteState) RewriteContainerConfig(ctr *Container, newCfg *ContainerConfig) error { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } return s.rewriteContainerConfig(ctr, newCfg) } // SafeRewriteContainerConfig rewrites a container's configuration in a more // limited fashion than RewriteContainerConfig. It is marked as safe to use // under most circumstances, unlike RewriteContainerConfig. // DO NOT USE TO: Change container dependencies, change pod membership, change // locks, change container ID. // TODO: Once BoltDB is removed, this can be combined with RewriteContainerConfig. func (s *SQLiteState) SafeRewriteContainerConfig(ctr *Container, oldName, newName string, newCfg *ContainerConfig) error { if !s.valid { return define.ErrDBClosed } if !ctr.valid { return define.ErrCtrRemoved } if newName != "" && newCfg.Name != newName { return fmt.Errorf("new name %s for container %s must match name in given container config: %w", newName, ctr.ID(), define.ErrInvalidArg) } if newName != "" && oldName == "" { return fmt.Errorf("must provide old name for container if a new name is given: %w", define.ErrInvalidArg) } return s.rewriteContainerConfig(ctr, newCfg) } // RewritePodConfig rewrites a pod's configuration. // WARNING: This function is DANGEROUS. Do not use without reading the full // comment on this function in state.go. func (s *SQLiteState) RewritePodConfig(pod *Pod, newCfg *PodConfig) (defErr error) { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } json, err := json.Marshal(newCfg) if err != nil { return fmt.Errorf("error marshalling pod %s config JSON: %w", pod.ID(), err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning transaction to rewrite pod %s config: %w", pod.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to rewrite pod %s config: %v", pod.ID(), err) } } }() results, err := tx.Exec("UPDATE PodConfig SET Name=?, JSON=? WHERE ID=?;", newCfg.Name, json, pod.ID()) if err != nil { return fmt.Errorf("updating pod config table with new configuration for pod %s: %w", pod.ID(), err) } rows, err := results.RowsAffected() if err != nil { return fmt.Errorf("retrieving pod %s config rewrite rows affected: %w", pod.ID(), err) } if rows == 0 { pod.valid = false return fmt.Errorf("no pod with ID %s found in DB: %w", pod.ID(), define.ErrNoSuchPod) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to rewrite pod %s config: %w", pod.ID(), err) } return nil } // RewriteVolumeConfig rewrites a volume's configuration. // WARNING: This function is DANGEROUS. Do not use without reading the full // comment on this function in state.go. func (s *SQLiteState) RewriteVolumeConfig(volume *Volume, newCfg *VolumeConfig) (defErr error) { if !s.valid { return define.ErrDBClosed } if !volume.valid { return define.ErrVolumeRemoved } json, err := json.Marshal(newCfg) if err != nil { return fmt.Errorf("error marshalling volume %s new config JSON: %w", volume.Name(), err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning transaction to rewrite volume %s config: %w", volume.Name(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to rewrite volume %s config: %v", volume.Name(), err) } } }() results, err := tx.Exec("UPDATE VolumeConfig SET Name=?, JSON=? WHERE ID=?;", newCfg.Name, json, volume.Name()) if err != nil { return fmt.Errorf("updating volume config table with new configuration for volume %s: %w", volume.Name(), err) } rows, err := results.RowsAffected() if err != nil { return fmt.Errorf("retrieving volume %s config rewrite rows affected: %w", volume.Name(), err) } if rows == 0 { volume.valid = false return fmt.Errorf("no volume with name %q found in DB: %w", volume.Name(), define.ErrNoSuchVolume) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to rewrite volume %s config: %w", volume.Name(), err) } return nil } // Pod retrieves a pod given its full ID func (s *SQLiteState) Pod(id string) (*Pod, error) { if id == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } row := s.conn.QueryRow("SELECT JSON FROM PodConfig WHERE ID=?;", id) var rawJSON string if err := row.Scan(&rawJSON); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, define.ErrNoSuchPod } return nil, fmt.Errorf("retrieving pod %s config from DB: %w", id, err) } ctrCfg := new(ContainerConfig) if err := json.Unmarshal([]byte(rawJSON), ctrCfg); err != nil { return nil, fmt.Errorf("unmarshalling container %s config: %w", id, err) } return s.createPod(rawJSON) } // LookupPod retrieves a pod from a full or unique partial ID, or a name. func (s *SQLiteState) LookupPod(idOrName string) (*Pod, error) { if idOrName == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } rows, err := s.conn.Query("SELECT JSON, Name FROM PodConfig WHERE PodConfig.Name=? OR (PodConfig.ID LIKE ?);", idOrName, idOrName+"%") if err != nil { return nil, fmt.Errorf("looking up pod %q in database: %w", idOrName, err) } defer rows.Close() var ( rawJSON, name string exactName bool resCount uint ) for rows.Next() { if err := rows.Scan(&rawJSON, &name); err != nil { return nil, fmt.Errorf("error retrieving pod %q ID from database: %w", idOrName, err) } if name == idOrName { exactName = true break } resCount++ } if !exactName { if resCount == 0 { return nil, fmt.Errorf("no pod with name or ID %s found: %w", idOrName, define.ErrNoSuchPod) } else if resCount > 1 { return nil, fmt.Errorf("more than one result for pod %q: %w", idOrName, define.ErrCtrExists) } } return s.createPod(rawJSON) } // HasPod checks if a pod with the given ID exists in the state func (s *SQLiteState) HasPod(id string) (bool, error) { if id == "" { return false, define.ErrEmptyID } if !s.valid { return false, define.ErrDBClosed } row := s.conn.QueryRow("SELECT 1 FROM PodConfig WHERE ID=?;", id) var check int if err := row.Scan(&check); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, fmt.Errorf("looking up pod %s in database: %w", id, err) } else if check != 1 { return false, fmt.Errorf("check digit for pod %s lookup incorrect: %w", id, define.ErrInternal) } return true, nil } // PodHasContainer checks if the given pod has a container with the given ID func (s *SQLiteState) PodHasContainer(pod *Pod, id string) (bool, error) { if id == "" { return false, define.ErrEmptyID } if !s.valid { return false, define.ErrDBClosed } if !pod.valid { return false, define.ErrPodRemoved } var check int row := s.conn.QueryRow("SELECT 1 FROM ContainerConfig WHERE ID=? AND PodID=?;", id, pod.ID()) if err := row.Scan(&check); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, fmt.Errorf("checking if pod %s has container %s in database: %w", pod.ID(), id, err) } else if check != 1 { return false, fmt.Errorf("check digit for pod %s lookup incorrect: %w", id, define.ErrInternal) } return true, nil } // PodContainersByID returns the IDs of all containers present in the given pod func (s *SQLiteState) PodContainersByID(pod *Pod) ([]string, error) { if !s.valid { return nil, define.ErrDBClosed } if !pod.valid { return nil, define.ErrPodRemoved } rows, err := s.conn.Query("SELECT ID FROM ContainerConfig WHERE PodID=?;", pod.ID()) if err != nil { return nil, fmt.Errorf("retrieving container IDs of pod %s from database: %w", pod.ID(), err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, fmt.Errorf("scanning container from database: %w", err) } ids = append(ids, id) } return ids, nil } // PodContainers returns all the containers present in the given pod func (s *SQLiteState) PodContainers(pod *Pod) ([]*Container, error) { if !s.valid { return nil, define.ErrDBClosed } if !pod.valid { return nil, define.ErrPodRemoved } rows, err := s.conn.Query("SELECT JSON FROM ContainerConfig WHERE PodID=?;", pod.ID()) if err != nil { return nil, fmt.Errorf("retrieving containers of pod %s from database: %w", pod.ID(), err) } defer rows.Close() var ctrs []*Container for rows.Next() { var rawJSON string if err := rows.Scan(&rawJSON); err != nil { return nil, fmt.Errorf("scanning container from database: %w", err) } ctr := new(Container) ctr.config = new(ContainerConfig) ctr.state = new(ContainerState) ctr.runtime = s.runtime if err := json.Unmarshal([]byte(rawJSON), ctr.config); err != nil { return nil, fmt.Errorf("unmarshalling container config: %w", err) } ctrs = append(ctrs, ctr) } for _, ctr := range ctrs { if err := finalizeCtrSqlite(ctr); err != nil { return nil, err } } return ctrs, nil } // AddPod adds the given pod to the state. func (s *SQLiteState) AddPod(pod *Pod) (defErr error) { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } infraID := sql.NullString{} if pod.state.InfraContainerID != "" { if err := infraID.Scan(pod.state.InfraContainerID); err != nil { return fmt.Errorf("scanning infra container ID %q: %w", pod.state.InfraContainerID, err) } } configJSON, err := json.Marshal(pod.config) if err != nil { return fmt.Errorf("marshalling pod config json: %w", err) } stateJSON, err := json.Marshal(pod.state) if err != nil { return fmt.Errorf("marshalling pod state json: %w", err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning pod create transaction: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to create pod: %v", err) } } }() // TODO: explore whether there's a more idiomatic way to do error checks for the name. // There is a sqlite3.ErrConstraintUnique error but I (vrothberg) couldn't find a way // to work with the returned errors yet. var check int row := tx.QueryRow("SELECT 1 FROM PodConfig WHERE Name=?;", pod.Name()) if err := row.Scan(&check); err != nil { if !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("checking if pod name %s exists in database: %w", pod.Name(), err) } } else if check != 0 { return fmt.Errorf("name %q is in use: %w", pod.Name(), define.ErrPodExists) } if _, err := tx.Exec("INSERT INTO IDNamespace VALUES (?);", pod.ID()); err != nil { return fmt.Errorf("adding pod id to database: %w", err) } if _, err := tx.Exec("INSERT INTO PodConfig VALUES (?, ?, ?);", pod.ID(), pod.Name(), configJSON); err != nil { return fmt.Errorf("adding pod config to database: %w", err) } if _, err := tx.Exec("INSERT INTO PodState VALUES (?, ?, ?);", pod.ID(), infraID, stateJSON); err != nil { return fmt.Errorf("adding pod state to database: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction: %w", err) } return nil } // RemovePod removes the given pod from the state. // Only empty pods can be removed. func (s *SQLiteState) RemovePod(pod *Pod) (defErr error) { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning pod %s removal transaction: %w", pod.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove pod %s: %v", pod.ID(), err) } } }() var check int row := tx.QueryRow("SELECT 1 FROM ContainerConfig WHERE PodID=? AND ID!=?;", pod.ID(), pod.state.InfraContainerID) if err := row.Scan(&check); err != nil { if !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("checking if pod %s has containers in database: %w", pod.ID(), err) } } else if check != 0 { return fmt.Errorf("pod %s is not empty: %w", pod.ID(), define.ErrCtrExists) } checkResult := func(result sql.Result) error { rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("retrieving pod %s delete rows affected: %w", pod.ID(), err) } if rows == 0 { pod.valid = false return define.ErrNoSuchPod } return nil } result, err := tx.Exec("DELETE FROM IDNamespace WHERE ID=?;", pod.ID()) if err != nil { return fmt.Errorf("removing pod %s id from database: %w", pod.ID(), err) } if err := checkResult(result); err != nil { return err } result, err = tx.Exec("DELETE FROM PodConfig WHERE ID=?;", pod.ID()) if err != nil { return fmt.Errorf("removing pod %s config from database: %w", pod.ID(), err) } if err := checkResult(result); err != nil { return err } result, err = tx.Exec("DELETE FROM PodState WHERE ID=?;", pod.ID()) if err != nil { return fmt.Errorf("removing pod %s state from database: %w", pod.ID(), err) } if err := checkResult(result); err != nil { return err } if err := tx.Commit(); err != nil { return fmt.Errorf("committing pod %s removal transaction: %w", pod.ID(), err) } return nil } // RemovePodContainers removes all containers in a pod. func (s *SQLiteState) RemovePodContainers(pod *Pod) (defErr error) { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning removal transaction for containers of pod %s: %w", pod.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove containers of pod %s: %v", pod.ID(), err) } } }() rows, err := tx.Query("SELECT ID FROM ContainerConfig WHERE PodID=?;", pod.ID()) if err != nil { return fmt.Errorf("retrieving container IDs of pod %s from database: %w", pod.ID(), err) } defer rows.Close() for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return fmt.Errorf("scanning container from database: %w", err) } if err := s.removeContainerWithTx(id, tx); err != nil { return err } } return nil } // AddContainerToPod adds the given container to an existing pod // The container will be added to the state and the pod func (s *SQLiteState) AddContainerToPod(pod *Pod, ctr *Container) error { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } if !ctr.valid { return define.ErrCtrRemoved } if ctr.config.Pod != pod.ID() { return fmt.Errorf("container %s is not part of pod %s: %w", ctr.ID(), pod.ID(), define.ErrNoSuchCtr) } return s.addContainer(ctr) } // RemoveContainerFromPod removes a container from an existing pod // The container will also be removed from the state func (s *SQLiteState) RemoveContainerFromPod(pod *Pod, ctr *Container) error { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } if ctr.config.Pod == "" { return fmt.Errorf("container %s is not part of a pod, use RemoveContainer instead: %w", ctr.ID(), define.ErrNoSuchPod) } if ctr.config.Pod != pod.ID() { return fmt.Errorf("container %s is not part of pod %s: %w", ctr.ID(), pod.ID(), define.ErrInvalidArg) } return s.removeContainer(ctr) } // UpdatePod updates a pod's state from the database. func (s *SQLiteState) UpdatePod(pod *Pod) error { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } row := s.conn.QueryRow("SELECT JSON FROM PodState WHERE ID=?;", pod.ID()) var rawJSON string if err := row.Scan(&rawJSON); err != nil { if errors.Is(err, sql.ErrNoRows) { // Pod was removed pod.valid = false return fmt.Errorf("no pod with ID %s found in database: %w", pod.ID(), define.ErrNoSuchPod) } } newState := new(podState) if err := json.Unmarshal([]byte(rawJSON), newState); err != nil { return fmt.Errorf("unmarshalling pod %s state JSON: %w", pod.ID(), err) } pod.state = newState return nil } // SavePod saves a pod's state to the database. func (s *SQLiteState) SavePod(pod *Pod) (defErr error) { if !s.valid { return define.ErrDBClosed } if !pod.valid { return define.ErrPodRemoved } stateJSON, err := json.Marshal(pod.state) if err != nil { return fmt.Errorf("marshalling pod %s state JSON: %w", pod.ID(), err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning pod %s save transaction: %w", pod.ID(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to save pod %s state: %v", pod.ID(), err) } } }() result, err := tx.Exec("UPDATE PodState SET JSON=? WHERE ID=?;", stateJSON, pod.ID()) if err != nil { return fmt.Errorf("writing pod %s state: %w", pod.ID(), err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("retrieving pod %s save rows affected: %w", pod.ID(), err) } if rows == 0 { pod.valid = false return define.ErrNoSuchPod } if err := tx.Commit(); err != nil { return fmt.Errorf("committing pod %s state: %w", pod.ID(), err) } return nil } // AllPods returns all pods present in the state. func (s *SQLiteState) AllPods() ([]*Pod, error) { if !s.valid { return nil, define.ErrDBClosed } pods := []*Pod{} rows, err := s.conn.Query("SELECT JSON FROM PodConfig;") if err != nil { return nil, fmt.Errorf("retrieving all pods from database: %w", err) } defer rows.Close() for rows.Next() { var rawJSON string if err := rows.Scan(&rawJSON); err != nil { return nil, fmt.Errorf("scanning pod from database: %w", err) } pod, err := s.createPod(rawJSON) if err != nil { return nil, err } pods = append(pods, pod) } return pods, nil } // AddVolume adds the given volume to the state. It also adds ctrDepID to // the sub bucket holding the container dependencies that this volume has func (s *SQLiteState) AddVolume(volume *Volume) (defErr error) { if !s.valid { return define.ErrDBClosed } if !volume.valid { return define.ErrVolumeRemoved } cfgJSON, err := json.Marshal(volume.config) if err != nil { return fmt.Errorf("marshalling volume %s configuration json: %w", volume.Name(), err) } volState := volume.state if volState == nil { volState = new(VolumeState) } stateJSON, err := json.Marshal(volState) if err != nil { return fmt.Errorf("marshalling volume %s state json: %w", volume.Name(), err) } storageID := sql.NullString{} if volume.config.StorageID != "" { storageID.Valid = true storageID.String = volume.config.StorageID } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning volume create transaction: %w", err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to create volume: %v", err) } } }() // TODO: There has to be a better way of doing this var check int row := tx.QueryRow("SELECT 1 FROM VolumeConfig WHERE Name=?;", volume.Name()) if err := row.Scan(&check); err != nil { if !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("checking if volume name %s exists in database: %w", volume.Name(), err) } } else if check != 0 { return fmt.Errorf("name %q is in use: %w", volume.Name(), define.ErrVolumeExists) } if _, err := tx.Exec("INSERT INTO VolumeConfig VALUES (?, ?, ?);", volume.Name(), storageID, cfgJSON); err != nil { return fmt.Errorf("adding volume %s config to database: %w", volume.Name(), err) } if _, err := tx.Exec("INSERT INTO VolumeState VALUES (?, ?);", volume.Name(), stateJSON); err != nil { return fmt.Errorf("adding volume %s state to database: %w", volume.Name(), err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction: %w", err) } return nil } // RemoveVolume removes the given volume from the state func (s *SQLiteState) RemoveVolume(volume *Volume) (defErr error) { if !s.valid { return define.ErrDBClosed } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning volume %s removal transaction: %w", volume.Name(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to remove volume %s: %v", volume.Name(), err) } } }() rows, err := tx.Query("SELECT ContainerID FROM ContainerVolume WHERE VolumeName=?;", volume.Name()) if err != nil { return fmt.Errorf("querying for containers using volume %s: %w", volume.Name(), err) } defer rows.Close() var ctrs []string for rows.Next() { var ctr string if err := rows.Scan(&ctr); err != nil { return fmt.Errorf("error scanning row for containers using volume %s: %w", volume.Name(), err) } ctrs = append(ctrs, ctr) } if len(ctrs) > 0 { return fmt.Errorf("volume %s is in use by containers %s: %w", volume.Name(), strings.Join(ctrs, ","), define.ErrVolumeBeingUsed) } // TODO TODO TODO: // Need to verify that at least 1 row was deleted from VolumeConfig. // Otherwise return ErrNoSuchVolume if _, err := tx.Exec("DELETE FROM VolumeConfig WHERE Name=?;", volume.Name()); err != nil { return fmt.Errorf("removing volume %s config from DB: %w", volume.Name(), err) } if _, err := tx.Exec("DELETE FROM VolumeState WHERE Name=?;", volume.Name()); err != nil { return fmt.Errorf("removing volume %s state from DB: %w", volume.Name(), err) } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to remove volume %s: %w", volume.Name(), err) } return nil } // UpdateVolume updates the volume's state from the database. func (s *SQLiteState) UpdateVolume(volume *Volume) error { if !s.valid { return define.ErrDBClosed } if !volume.valid { return define.ErrVolumeRemoved } row := s.conn.QueryRow("SELECT JSON FROM VolumeState WHERE Name=?;", volume.Name()) var stateJSON string if err := row.Scan(&stateJSON); err != nil { if errors.Is(err, sql.ErrNoRows) { volume.valid = false return define.ErrNoSuchVolume } return fmt.Errorf("scanning volume %s state JSON: %w", volume.Name(), err) } newState := new(VolumeState) if err := json.Unmarshal([]byte(stateJSON), newState); err != nil { return fmt.Errorf("unmarshalling volume %s state: %w", volume.Name(), err) } volume.state = newState return nil } // SaveVolume saves the volume's state to the database. func (s *SQLiteState) SaveVolume(volume *Volume) (defErr error) { if !s.valid { return define.ErrDBClosed } if !volume.valid { return define.ErrVolumeRemoved } stateJSON, err := json.Marshal(volume.state) if err != nil { return fmt.Errorf("marshalling volume %s state JSON: %w", volume.Name(), err) } tx, err := s.conn.Begin() if err != nil { return fmt.Errorf("beginning transaction to rewrite volume %s state: %w", volume.Name(), err) } defer func() { if defErr != nil { if err := tx.Rollback(); err != nil { logrus.Errorf("Rolling back transaction to rewrite volume %s state: %v", volume.Name(), err) } } }() results, err := tx.Exec("UPDATE VolumeState SET JSON=? WHERE Name=?;", stateJSON, volume.Name()) if err != nil { return fmt.Errorf("updating volume %s state in DB: %w", volume.Name(), err) } rows, err := results.RowsAffected() if err != nil { return fmt.Errorf("retrieving volume %s state rewrite rows affected: %w", volume.Name(), err) } if rows == 0 { volume.valid = false return define.ErrNoSuchVolume } if err := tx.Commit(); err != nil { return fmt.Errorf("committing transaction to rewrite volume %s state: %w", volume.Name(), err) } return nil } // AllVolumes returns all volumes present in the state. func (s *SQLiteState) AllVolumes() ([]*Volume, error) { if !s.valid { return nil, define.ErrDBClosed } rows, err := s.conn.Query("SELECT JSON FROM VolumeConfig;") if err != nil { return nil, fmt.Errorf("querying database for all volumes: %w", err) } defer rows.Close() var volumes []*Volume for rows.Next() { var configJSON string if err := rows.Scan(&configJSON); err != nil { return nil, fmt.Errorf("scanning volume config from database: %w", err) } vol := new(Volume) vol.config = new(VolumeConfig) vol.state = new(VolumeState) vol.runtime = s.runtime if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil { return nil, fmt.Errorf("unmarshalling volume config: %w", err) } if err := finalizeVolumeSqlite(vol); err != nil { return nil, err } volumes = append(volumes, vol) } return volumes, nil } // Volume retrieves a volume from full name. func (s *SQLiteState) Volume(name string) (*Volume, error) { if name == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } row := s.conn.QueryRow("SELECT JSON FROM VolumeConfig WHERE Name=?;", name) var configJSON string if err := row.Scan(&configJSON); err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, define.ErrNoSuchVolume } } vol := new(Volume) vol.config = new(VolumeConfig) vol.state = new(VolumeState) vol.runtime = s.runtime if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil { return nil, fmt.Errorf("unmarshalling volume %s config JSON: %w", name, err) } if err := finalizeVolumeSqlite(vol); err != nil { return nil, err } return vol, nil } // LookupVolume locates a volume from a unique partial name. func (s *SQLiteState) LookupVolume(name string) (*Volume, error) { if name == "" { return nil, define.ErrEmptyID } if !s.valid { return nil, define.ErrDBClosed } rows, err := s.conn.Query("SELECT Name, JSON FROM VolumeConfig WHERE Name LIKE ? ORDER BY LENGTH(Name) ASC;", name+"%") if err != nil { return nil, fmt.Errorf("querying database for volume %s: %w", name, err) } defer rows.Close() var foundName, configJSON string for rows.Next() { if foundName != "" { return nil, fmt.Errorf("more than one result for volume name %s: %w", name, define.ErrVolumeExists) } if err := rows.Scan(&foundName, &configJSON); err != nil { return nil, fmt.Errorf("retrieving volume %s config from database: %w", name, err) } if foundName == name { break } } if foundName == "" { return nil, fmt.Errorf("no volume with name %q found: %w", name, define.ErrNoSuchVolume) } vol := new(Volume) vol.config = new(VolumeConfig) vol.state = new(VolumeState) vol.runtime = s.runtime if err := json.Unmarshal([]byte(configJSON), vol.config); err != nil { return nil, fmt.Errorf("unmarshalling volume %s config JSON: %w", name, err) } if err := finalizeVolumeSqlite(vol); err != nil { return nil, err } return vol, nil } // HasVolume returns true if the given volume exists in the state. // Otherwise it returns false. func (s *SQLiteState) HasVolume(name string) (bool, error) { if name == "" { return false, define.ErrEmptyID } if !s.valid { return false, define.ErrDBClosed } row := s.conn.QueryRow("SELECT 1 FROM VolumeConfig WHERE Name=?;", name) var check int if err := row.Scan(&check); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, fmt.Errorf("looking up volume %s in database: %w", name, err) } if check != 1 { return false, fmt.Errorf("check digit for volume %s lookup incorrect: %w", name, define.ErrInternal) } return true, nil } // VolumeInUse checks if any container is using the volume. // It returns a slice of the IDs of the containers using the given // volume. If the slice is empty, no containers use the given volume. func (s *SQLiteState) VolumeInUse(volume *Volume) ([]string, error) { if !s.valid { return nil, define.ErrDBClosed } if !volume.valid { return nil, define.ErrVolumeRemoved } rows, err := s.conn.Query("SELECT ContainerID FROM ContainerVolume WHERE VolumeName=?;", volume.Name()) if err != nil { return nil, fmt.Errorf("querying database for containers using volume %s: %w", volume.Name(), err) } defer rows.Close() var ctrs []string for rows.Next() { var ctr string if err := rows.Scan(&ctr); err != nil { return nil, fmt.Errorf("scanning container ID for container using volume %s: %w", volume.Name(), err) } ctrs = append(ctrs, ctr) } return ctrs, nil } // ContainerIDIsVolume checks if the given c/storage container ID is used as // backing storage for a volume. func (s *SQLiteState) ContainerIDIsVolume(id string) (bool, error) { if !s.valid { return false, define.ErrDBClosed } row := s.conn.QueryRow("SELECT 1 FROM VolumeConfig WHERE StorageID=?;", id) var checkDigit int if err := row.Scan(&checkDigit); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, fmt.Errorf("error retrieving volumes using storage ID %s: %w", id, err) } if checkDigit != 1 { return false, fmt.Errorf("check digit for volumes using storage ID %s was incorrect: %w", id, define.ErrInternal) } return true, nil }