Implement exec session handling in SQL database

Signed-off-by: Matt Heon <mheon@redhat.com>
This commit is contained in:
Matt Heon
2023-02-21 14:04:31 -05:00
parent 627a5b73bf
commit c0b92bdbc7
2 changed files with 88 additions and 223 deletions

View File

@ -944,8 +944,7 @@ func (s *SQLiteState) PruneContainerExitCodes() (defErr error) {
} }
// AddExecSession adds an exec session to the state. // AddExecSession adds an exec session to the state.
// TODO TODO TODO func (s *SQLiteState) AddExecSession(ctr *Container, session *ExecSession) (defErr error) {
func (s *SQLiteState) AddExecSession(ctr *Container, session *ExecSession) error {
if !s.valid { if !s.valid {
return define.ErrDBClosed return define.ErrDBClosed
} }
@ -954,54 +953,27 @@ func (s *SQLiteState) AddExecSession(ctr *Container, session *ExecSession) error
return define.ErrCtrRemoved return define.ErrCtrRemoved
} }
return define.ErrNotImplemented tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container %s exec session %s add transaction: %w", ctr.ID(), session.Id, err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to add container %s exec session %s: %v", ctr.ID(), session.Id, err)
}
}
}()
// db, err := s.getDBCon() if _, err := tx.Exec("INSERT INTO ContainerExecSession VALUES (?, ?);", session.Id, ctr.ID()); err != nil {
// if err != nil { return fmt.Errorf("adding container %s exec session %s to database: %w", ctr.ID(), session.Id, err)
// return err }
// }
// defer s.deferredCloseDBCon(db)
// ctrID := []byte(ctr.ID()) if err := tx.Commit(); err != nil {
// sessionID := []byte(session.ID()) return fmt.Errorf("committing container %s exec session %s addition: %w", ctr.ID(), session.Id, err)
}
// err = db.Update(func(tx *bolt.Tx) error { return nil
// execBucket, err := getExecBucket(tx)
// if err != nil {
// return err
// }
// ctrBucket, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
// dbCtr := ctrBucket.Bucket(ctrID)
// if dbCtr == nil {
// ctr.valid = false
// return fmt.Errorf("container %s is not present in the database: %w", ctr.ID(), define.ErrNoSuchCtr)
// }
// ctrExecSessionBucket, err := dbCtr.CreateBucketIfNotExists(execBkt)
// if err != nil {
// return fmt.Errorf("creating exec sessions bucket for container %s: %w", ctr.ID(), err)
// }
// execExists := execBucket.Get(sessionID)
// if execExists != nil {
// return fmt.Errorf("an exec session with ID %s already exists: %w", session.ID(), define.ErrExecSessionExists)
// }
// if err := execBucket.Put(sessionID, ctrID); err != nil {
// return fmt.Errorf("adding exec session %s to DB: %w", session.ID(), err)
// }
// if err := ctrExecSessionBucket.Put(sessionID, ctrID); err != nil {
// return fmt.Errorf("adding exec session %s to container %s in DB: %w", session.ID(), ctr.ID(), err)
// }
// return nil
// })
// return err
} }
// GetExecSession returns the ID of the container an exec session is associated // GetExecSession returns the ID of the container an exec session is associated
@ -1015,98 +987,55 @@ func (s *SQLiteState) GetExecSession(id string) (string, error) {
return "", define.ErrEmptyID return "", define.ErrEmptyID
} }
return "", define.ErrNotImplemented row := s.conn.QueryRow("SELECT ContainerID FROM ContainerExecSession WHERE ID=?;", id)
// db, err := s.getDBCon() var ctrID string
// if err != nil { if err := row.Scan(&ctrID); err != nil {
// return "", err if errors.Is(err, sql.ErrNoRows) {
// } return "", fmt.Errorf("no exec session with ID %s found: %w", id, define.ErrNoSuchExecSession)
// defer s.deferredCloseDBCon(db) }
return "", fmt.Errorf("retrieving exec session %s from database: %w", id, err)
}
// ctrID := "" return ctrID, nil
// err = db.View(func(tx *bolt.Tx) error {
// execBucket, err := getExecBucket(tx)
// if err != nil {
// return err
// }
// ctr := execBucket.Get([]byte(id))
// if ctr == nil {
// return fmt.Errorf("no exec session with ID %s found: %w", id, define.ErrNoSuchExecSession)
// }
// ctrID = string(ctr)
// return nil
// })
// return ctrID, err
} }
// RemoveExecSession removes references to the given exec session in the // RemoveExecSession removes references to the given exec session in the
// database. // database.
func (s *SQLiteState) RemoveExecSession(session *ExecSession) error { func (s *SQLiteState) RemoveExecSession(session *ExecSession) (defErr error) {
if !s.valid { if !s.valid {
return define.ErrDBClosed return define.ErrDBClosed
} }
return define.ErrNotImplemented tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container %s exec session %s remove transaction: %w", session.ContainerId, session.Id, err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to remove container %s exec session %s: %v", session.ContainerID, session.Id, err)
}
}
}()
// db, err := s.getDBCon() result, err := tx.Exec("DELETE FROM ContainerExecSession WHERE ID=?;", session.Id)
// if err != nil { if err != nil {
// return err return fmt.Errorf("removing container %s exec session %s from database: %w", session.ContainerId, session.Id, err)
// } }
// defer s.deferredCloseDBCon(db) rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("retrieving container %s exec session %s removal rows modified: %w", session.ContainerId, session.Id, err)
}
if rows == 0 {
return define.ErrNoSuchExecSession
}
// sessionID := []byte(session.ID()) if err := tx.Commit(); err != nil {
// containerID := []byte(session.ContainerID()) return fmt.Errorf("committing container %s exec session %s removal: %w", session.ContainerId, session.Id, err)
// err = db.Update(func(tx *bolt.Tx) error { }
// execBucket, err := getExecBucket(tx)
// if err != nil {
// return err
// }
// ctrBucket, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
// sessionExists := execBucket.Get(sessionID) return nil
// if sessionExists == nil {
// return define.ErrNoSuchExecSession
// }
// // Check that container ID matches
// if string(sessionExists) != session.ContainerID() {
// return fmt.Errorf("database inconsistency: exec session %s points to container %s in state but %s in database: %w", session.ID(), session.ContainerID(), string(sessionExists), define.ErrInternal)
// }
// if err := execBucket.Delete(sessionID); err != nil {
// return fmt.Errorf("removing exec session %s from database: %w", session.ID(), err)
// }
// dbCtr := ctrBucket.Bucket(containerID)
// if dbCtr == nil {
// // State is inconsistent. We refer to a container that
// // is no longer in the state.
// // Return without error, to attempt to recover.
// return nil
// }
// ctrExecBucket := dbCtr.Bucket(execBkt)
// if ctrExecBucket == nil {
// // Again, state is inconsistent. We should have an exec
// // bucket, and it should have this session.
// // Again, nothing we can do, so proceed and try to
// // recover.
// return nil
// }
// ctrSessionExists := ctrExecBucket.Get(sessionID)
// if ctrSessionExists != nil {
// if err := ctrExecBucket.Delete(sessionID); err != nil {
// return fmt.Errorf("removing exec session %s from container %s in database: %w", session.ID(), session.ContainerID(), err)
// }
// }
// return nil
// })
// return err
} }
// GetContainerExecSessions retrieves the IDs of all exec sessions running in a // GetContainerExecSessions retrieves the IDs of all exec sessions running in a
@ -1120,48 +1049,27 @@ func (s *SQLiteState) GetContainerExecSessions(ctr *Container) ([]string, error)
return nil, define.ErrCtrRemoved return nil, define.ErrCtrRemoved
} }
return nil, define.ErrNotImplemented rows, err := s.conn.Query("SELECT ID FROM ContainerExecSession WHERE ContainerID=?;", ctr.ID())
if err != nil {
return nil, fmt.Errorf("querying container %s exec sessions: %w", ctr.ID(), err)
}
defer rows.Close()
// db, err := s.getDBCon() var sessions []string
// if err != nil { for rows.Next() {
// return nil, err var session string
// } if err := rows.Scan(&session); err != nil {
// defer s.deferredCloseDBCon(db) return nil, fmt.Errorf("scanning container %s exec sessions row: %w", ctr.ID(), err)
}
sessions = append(sessions, session)
}
// ctrID := []byte(ctr.ID()) return sessions, nil
// sessions := []string{}
// err = db.View(func(tx *bolt.Tx) error {
// ctrBucket, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
// dbCtr := ctrBucket.Bucket(ctrID)
// if dbCtr == nil {
// ctr.valid = false
// return define.ErrNoSuchCtr
// }
// ctrExecSessions := dbCtr.Bucket(execBkt)
// if ctrExecSessions == nil {
// return nil
// }
// return ctrExecSessions.ForEach(func(id, unused []byte) error {
// sessions = append(sessions, string(id))
// return nil
// })
// })
// if err != nil {
// return nil, err
// }
// return sessions, nil
} }
// RemoveContainerExecSessions removes all exec sessions attached to a given // RemoveContainerExecSessions removes all exec sessions attached to a given
// container. // container.
func (s *SQLiteState) RemoveContainerExecSessions(ctr *Container) error { func (s *SQLiteState) RemoveContainerExecSessions(ctr *Container) (defErr error) {
if !s.valid { if !s.valid {
return define.ErrDBClosed return define.ErrDBClosed
} }
@ -1170,69 +1078,27 @@ func (s *SQLiteState) RemoveContainerExecSessions(ctr *Container) error {
return define.ErrCtrRemoved return define.ErrCtrRemoved
} }
return define.ErrNotImplemented tx, err := s.conn.Begin()
if err != nil {
return fmt.Errorf("beginning container %s exec session removal transaction: %w", ctr.ID(), err)
}
defer func() {
if defErr != nil {
if err := tx.Rollback(); err != nil {
logrus.Errorf("Rolling back transaction to remove container %s exec sessions: %v", ctr.ID(), err)
}
}
}()
// db, err := s.getDBCon() if _, err := tx.Exec("DELETE FROM ContainerExecSessions WHERE ContainerID=?;", ctr.ID()); err != nil {
// if err != nil { return fmt.Errorf("removing container %s exec sessions from database: %w", ctr.ID(), err)
// return err }
// }
// defer s.deferredCloseDBCon(db)
// ctrID := []byte(ctr.ID()) if err := tx.Commit(); err != nil {
// sessions := []string{} return fmt.Errorf("committing container %s exec session removal: %w", ctr.ID(), err)
}
// err = db.Update(func(tx *bolt.Tx) error { return nil
// execBucket, err := getExecBucket(tx)
// if err != nil {
// return err
// }
// ctrBucket, err := getCtrBucket(tx)
// if err != nil {
// return err
// }
// dbCtr := ctrBucket.Bucket(ctrID)
// if dbCtr == nil {
// ctr.valid = false
// return define.ErrNoSuchCtr
// }
// ctrExecSessions := dbCtr.Bucket(execBkt)
// if ctrExecSessions == nil {
// return nil
// }
// err = ctrExecSessions.ForEach(func(id, unused []byte) error {
// sessions = append(sessions, string(id))
// return nil
// })
// if err != nil {
// return err
// }
// for _, session := range sessions {
// if err := ctrExecSessions.Delete([]byte(session)); err != nil {
// return fmt.Errorf("removing container %s exec session %s from database: %w", ctr.ID(), session, err)
// }
// // Check if the session exists in the global table
// // before removing. It should, but in cases where the DB
// // has become inconsistent, we should try and proceed
// // so we can recover.
// sessionExists := execBucket.Get([]byte(session))
// if sessionExists == nil {
// continue
// }
// if string(sessionExists) != ctr.ID() {
// return fmt.Errorf("database mismatch: exec session %s is associated with containers %s and %s: %w", session, ctr.ID(), string(sessionExists), define.ErrInternal)
// }
// if err := execBucket.Delete([]byte(session)); err != nil {
// return fmt.Errorf("removing container %s exec session %s from exec sessions: %w", ctr.ID(), session, err)
// }
// }
// return nil
// })
// return err
} }
// RewriteContainerConfig rewrites a container's configuration. // RewriteContainerConfig rewrites a container's configuration.

View File

@ -93,7 +93,6 @@ func sqliteInitTables(conn *sql.DB) (defErr error) {
CREATE TABLE IF NOT EXISTS ContainerExecSession( CREATE TABLE IF NOT EXISTS ContainerExecSession(
ID TEXT PRIMARY KEY NOT NULL, ID TEXT PRIMARY KEY NOT NULL,
ContainerID TEXT NOT NULL, ContainerID TEXT NOT NULL,
JSON TEXT NOT NULL,
FOREIGN KEY (ContainerID) REFERENCES ContainerConfig(ID) FOREIGN KEY (ContainerID) REFERENCES ContainerConfig(ID)
);` );`