Implement pod operations in SQL state

Signed-off-by: Matthew Heon <matthew.heon@gmail.com>

Closes: #268
Approved by: rhatdan
This commit is contained in:
Matthew Heon
2018-01-22 10:55:33 -05:00
committed by Atomic Bot
parent 6214be07c2
commit cfd6da22df
2 changed files with 655 additions and 306 deletions

View File

@ -3,7 +3,6 @@ package libpod
import (
"database/sql"
"encoding/json"
"io/ioutil"
"os"
"github.com/pkg/errors"
@ -148,23 +147,7 @@ func (s *SQLState) Refresh() (err error) {
// Container retrieves a container from its full ID
func (s *SQLState) Container(id string) (*Container, error) {
const query = `SELECT containers.*,
containerState.State,
containerState.ConfigPath,
containerState.RunDir,
containerState.MountPoint,
containerState.StartedTime,
containerState.FinishedTime,
containerState.ExitCode,
containerState.OomKilled,
containerState.Pid,
containerState.NetNSPath,
containerState.IPAddress,
containerState.SubnetMask
FROM containers
INNER JOIN
containerState ON containers.Id = containerState.Id
WHERE containers.Id=?;`
const query = containerQuery + "WHERE containers.Id=?;"
if id == "" {
return nil, ErrEmptyID
@ -186,23 +169,7 @@ func (s *SQLState) Container(id string) (*Container, error) {
// LookupContainer retrieves a container by full or unique partial ID or name
func (s *SQLState) LookupContainer(idOrName string) (*Container, error) {
const query = `SELECT containers.*,
containerState.State,
containerState.ConfigPath,
containerState.RunDir,
containerState.MountPoint,
containerState.StartedTime,
containerState.FinishedTime,
containerState.ExitCode,
containerState.OomKilled,
containerState.Pid,
containerState.NetNSPath,
containerState.IPAddress,
containerState.SubnetMask
FROM containers
INNER JOIN
containerState ON containers.Id = containerState.Id
WHERE (containers.Id LIKE ?) OR containers.Name=?;`
const query = containerQuery + "WHERE (containers.Id LIKE ?) OR containers.Name=?;"
if idOrName == "" {
return nil, ErrEmptyID
@ -277,189 +244,15 @@ func (s *SQLState) HasContainer(id string) (bool, error) {
// If the container belongs to a pod, that pod must already be present in the
// state, and the container will be added to the pod
func (s *SQLState) AddContainer(ctr *Container) (err error) {
const (
addCtr = `INSERT INTO containers VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?
);`
addCtrState = `INSERT INTO containerState VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?
);`
)
if !s.valid {
return ErrDBClosed
}
if !ctr.valid {
return ErrCtrRemoved
}
mounts, err := json.Marshal(ctr.config.Mounts)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s mounts to JSON", ctr.ID())
if ctr.config.Pod != "" {
return errors.Wrapf(ErrPodExists, "cannot add container that belongs to a pod, use AddContainerToPod instead")
}
dnsServerJSON, err := json.Marshal(ctr.config.DNSServer)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s DNS servers to JSON", ctr.ID())
}
dnsSearchJSON, err := json.Marshal(ctr.config.DNSSearch)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s DNS search domains to JSON", ctr.ID())
}
dnsOptionJSON, err := json.Marshal(ctr.config.DNSOption)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s DNS options to JSON", ctr.ID())
}
hostAddJSON, err := json.Marshal(ctr.config.HostAdd)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s hosts to JSON", ctr.ID())
}
labelsJSON, err := json.Marshal(ctr.config.Labels)
if err != nil {
return errors.Wrapf(err, "error marshaling container %s labels to JSON", ctr.ID())
}
netNSPath := ""
if ctr.state.NetNS != nil {
netNSPath = ctr.state.NetNS.Path()
}
specJSON, err := json.Marshal(ctr.config.Spec)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s spec to JSON", ctr.ID())
}
portsJSON := []byte{}
if len(ctr.config.PortMappings) > 0 {
portsJSON, err = json.Marshal(&ctr.config.PortMappings)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s port mappings to JSON", ctr.ID())
}
}
tx, err := s.db.Begin()
if err != nil {
return errors.Wrapf(err, "error beginning database transaction")
}
defer func() {
if err != nil {
if err2 := tx.Rollback(); err2 != nil {
logrus.Errorf("Error rolling back transaction to add container %s: %v", ctr.ID(), err2)
}
}
}()
// Add static container information
_, err = tx.Exec(addCtr,
ctr.ID(),
ctr.Name(),
stringToNullString(ctr.PodID()),
ctr.config.RootfsImageID,
ctr.config.RootfsImageName,
boolToSQL(ctr.config.ImageVolumes),
boolToSQL(ctr.config.ReadOnly),
ctr.config.ShmDir,
ctr.config.ShmSize,
ctr.config.StaticDir,
string(mounts),
ctr.LogPath(),
boolToSQL(ctr.config.Privileged),
boolToSQL(ctr.config.NoNewPrivs),
ctr.config.ProcessLabel,
ctr.config.MountLabel,
ctr.config.User,
stringToNullString(ctr.config.IPCNsCtr),
stringToNullString(ctr.config.MountNsCtr),
stringToNullString(ctr.config.NetNsCtr),
stringToNullString(ctr.config.PIDNsCtr),
stringToNullString(ctr.config.UserNsCtr),
stringToNullString(ctr.config.UTSNsCtr),
stringToNullString(ctr.config.CgroupNsCtr),
boolToSQL(ctr.config.CreateNetNS),
string(dnsServerJSON),
string(dnsSearchJSON),
string(dnsOptionJSON),
string(hostAddJSON),
boolToSQL(ctr.config.Stdin),
string(labelsJSON),
ctr.config.StopSignal,
ctr.config.StopTimeout,
timeToSQL(ctr.config.CreatedTime),
ctr.config.CgroupParent)
if err != nil {
return errors.Wrapf(err, "error adding static information for container %s to database", ctr.ID())
}
// Add container state to the database
_, err = tx.Exec(addCtrState,
ctr.ID(),
ctr.state.State,
ctr.state.ConfigPath,
ctr.state.RunDir,
ctr.state.Mountpoint,
timeToSQL(ctr.state.StartedTime),
timeToSQL(ctr.state.FinishedTime),
ctr.state.ExitCode,
boolToSQL(ctr.state.OOMKilled),
ctr.state.PID,
netNSPath,
ctr.state.IPAddress,
ctr.state.SubnetMask)
if err != nil {
return errors.Wrapf(err, "error adding container %s state to database", ctr.ID())
}
// Save the container's runtime spec to disk
specPath := getSpecPath(s.specsDir, ctr.ID())
if err := ioutil.WriteFile(specPath, specJSON, 0750); err != nil {
return errors.Wrapf(err, "error saving container %s spec JSON to disk", ctr.ID())
}
defer func() {
if err != nil {
if err2 := os.Remove(specPath); err2 != nil {
logrus.Errorf("Error removing container %s JSON spec from state: %v", ctr.ID(), err2)
}
}
}()
// If the container has port mappings, save them to disk
if len(ctr.config.PortMappings) > 0 {
portPath := getPortsPath(s.specsDir, ctr.ID())
if err := ioutil.WriteFile(portPath, portsJSON, 0750); err != nil {
return errors.Wrapf(err, "error saving container %s port JSON to disk", ctr.ID())
}
defer func() {
if err != nil {
if err2 := os.Remove(portPath); err2 != nil {
logrus.Errorf("Error removing container %s JSON ports from state: %v", ctr.ID(), err2)
}
}
}()
}
if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing transaction to add container %s", ctr.ID())
}
return nil
return s.addContainer(ctr)
}
// UpdateContainer updates a container's state from the database
@ -709,93 +502,18 @@ func (s *SQLState) ContainerInUse(ctr *Container) ([]string, error) {
return ids, nil
}
// RemoveContainer removes the container from the state
// RemoveContainer removes the given container from the state
func (s *SQLState) RemoveContainer(ctr *Container) error {
const (
removeCtr = "DELETE FROM containers WHERE Id=?;"
removeState = "DELETE FROM containerState WHERE ID=?;"
)
if !s.valid {
return ErrDBClosed
if ctr.config.Pod != "" {
return errors.Wrapf(ErrPodExists, "container %s belongs to a pod, use RemoveContainerFromPod", ctr.ID())
}
committed := false
tx, err := s.db.Begin()
if err != nil {
return errors.Wrapf(err, "error beginning database transaction")
}
defer func() {
if err != nil && !committed {
if err2 := tx.Rollback(); err2 != nil {
logrus.Errorf("Error rolling back transaction to add container %s: %v", ctr.ID(), err2)
}
}
}()
// Check rows acted on for the first transaction, verify we actually removed something
result, err := tx.Exec(removeCtr, ctr.ID())
if err != nil {
return errors.Wrapf(err, "error removing container %s from containers table", ctr.ID())
}
rows, err := result.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error retrieving number of rows in transaction removing container %s", ctr.ID())
} else if rows == 0 {
return ErrNoSuchCtr
}
if _, err := tx.Exec(removeState, ctr.ID()); err != nil {
return errors.Wrapf(err, "error removing container %s from state table", ctr.ID())
}
if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing transaction to remove container %s", ctr.ID())
}
committed = true
// Remove the container's JSON from disk
jsonPath := getSpecPath(s.specsDir, ctr.ID())
if err := os.Remove(jsonPath); err != nil {
return errors.Wrapf(err, "error removing JSON spec from state for container %s", ctr.ID())
}
// Remove containers ports JSON from disk
// May not exist, so ignore os.IsNotExist
portsPath := getPortsPath(s.specsDir, ctr.ID())
if err := os.Remove(portsPath); err != nil {
if !os.IsNotExist(err) {
return errors.Wrapf(err, "error removing JSON ports from state for container %s", ctr.ID())
}
}
ctr.valid = false
return nil
return s.removeContainer(ctr)
}
// AllContainers retrieves all the containers presently in the state
func (s *SQLState) AllContainers() ([]*Container, error) {
// TODO maybe do an ORDER BY here?
const query = `SELECT containers.*,
containerState.State,
containerState.ConfigPath,
containerState.RunDir,
containerState.MountPoint,
containerState.StartedTime,
containerState.FinishedTime,
containerState.ExitCode,
containerState.OomKilled,
containerState.Pid,
containerState.NetNSPath,
containerState.IPAddress,
containerState.SubnetMask
FROM containers
INNER JOIN
containerState ON containers.Id = containerState.Id
ORDER BY containers.CreatedTime DESC;`
const query = containerQuery + ";"
if !s.valid {
return nil, ErrDBClosed
@ -826,59 +544,352 @@ func (s *SQLState) AllContainers() ([]*Container, error) {
// Pod retrieves a pod by its full ID
func (s *SQLState) Pod(id string) (*Pod, error) {
return nil, ErrNotImplemented
const query = "SELECT * FROM pods WHERE Id=?;"
if !s.valid {
return nil, ErrDBClosed
}
if id == "" {
return nil, ErrEmptyID
}
row := s.db.QueryRow(query, id)
pod, err := s.podFromScannable(row)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving pod %s from database", id)
}
return pod, nil
}
// LookupPod retrieves a pot by full or unique partial ID or name
func (s *SQLState) LookupPod(idOrName string) (*Pod, error) {
return nil, ErrNotImplemented
const query = "SELECT * FROM pods WHERE (Id LIKE ?) OR Name=?;"
if idOrName == "" {
return nil, ErrEmptyID
}
if !s.valid {
return nil, ErrDBClosed
}
rows, err := s.db.Query(query, idOrName+"%", idOrName)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving pod %s row from database", idOrName)
}
defer rows.Close()
foundResult := false
var pod *Pod
for rows.Next() {
if foundResult {
return nil, errors.Wrapf(ErrCtrExists, "more than one result for ID or name %s", idOrName)
}
var err error
pod, err = s.podFromScannable(rows)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving pod %s from database", idOrName)
}
foundResult = true
}
if err := rows.Err(); err != nil {
return nil, errors.Wrapf(err, "error retrieving rows for pod ID or name %s", idOrName)
}
if !foundResult {
return nil, errors.Wrapf(ErrNoSuchCtr, "no pod with ID or name %s found", idOrName)
}
return pod, nil
}
// HasPod checks if a pod exists given its full ID
func (s *SQLState) HasPod(id string) (bool, error) {
return false, ErrNotImplemented
if id == "" {
return false, ErrEmptyID
}
if !s.valid {
return false, ErrDBClosed
}
return s.podExists(id)
}
// PodHasContainer checks if the given pod containers a container with the given
// ID
func (s *SQLState) PodHasContainer(pod *Pod, ctrID string) (bool, error) {
return false, ErrNotImplemented
const query = "SELECT 1 FROM containers WHERE Id=? AND Pod=?;"
if ctrID == "" {
return false, ErrEmptyID
}
if !s.valid {
return false, ErrDBClosed
}
if !pod.valid {
return false, ErrPodRemoved
}
row := s.db.QueryRow(query, ctrID, pod.ID())
var check int
err := row.Scan(&check)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, errors.Wrapf(err, "error questing database for existence of container %s", ctrID)
} else if check != 1 {
return false, errors.Wrapf(ErrInternal, "check digit for PodHasContainer query incorrect")
}
return true, nil
}
// PodContainersByID returns the container IDs of all containers in the given
// pod
func (s *SQLState) PodContainersByID(pod *Pod) ([]string, error) {
return nil, ErrNotImplemented
const query = "SELECT Id FROM containers WHERE Pod=?;"
if !s.valid {
return nil, ErrDBClosed
}
if !pod.valid {
return nil, ErrPodRemoved
}
// Check to make sure pod still exists in DB
exists, err := s.podExists(pod.ID())
if err != nil {
return nil, err
}
if !exists {
pod.valid = false
return nil, ErrPodRemoved
}
// Get actual containers
rows, err := s.db.Query(query, pod.ID())
if err != nil {
return nil, errors.Wrapf(err, "error retrieving containers from database")
}
defer rows.Close()
containers := []string{}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
if err == sql.ErrNoRows {
return nil, ErrNoSuchCtr
}
return nil, errors.Wrapf(err, "error parsing database row into container ID")
}
containers = append(containers, id)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrapf(err, "error retrieving container rows")
}
return containers, nil
}
// PodContainers returns all the containers in a pod given the pod's full ID
func (s *SQLState) PodContainers(pod *Pod) ([]*Container, error) {
return nil, ErrNotImplemented
const query = containerQuery + "WHERE containers.Pod=?;"
if !s.valid {
return nil, ErrDBClosed
}
if !pod.valid {
return nil, ErrPodRemoved
}
// Check to make sure pod still exists in DB
exists, err := s.podExists(pod.ID())
if err != nil {
return nil, err
}
if !exists {
pod.valid = false
return nil, ErrPodRemoved
}
// Get actual containers
rows, err := s.db.Query(query, pod.ID())
if err != nil {
return nil, errors.Wrapf(err, "error retrieving containers from database")
}
defer rows.Close()
containers := []*Container{}
for rows.Next() {
ctr, err := s.ctrFromScannable(rows)
if err != nil {
return nil, err
}
containers = append(containers, ctr)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrapf(err, "error retrieving container rows")
}
return containers, nil
}
// AddPod adds a pod to the state
// Only empty pods can be added to the state
func (s *SQLState) AddPod(pod *Pod) error {
return ErrNotImplemented
func (s *SQLState) AddPod(pod *Pod) (err error) {
const query = "INSERT INTO pods VALUES (?, ?, ?);"
if !s.valid {
return ErrDBClosed
}
if !pod.valid {
return ErrPodRemoved
}
labelsJSON, err := json.Marshal(pod.labels)
if err != nil {
return errors.Wrapf(err, "error marshaling pod %s labels to JSON", pod.ID())
}
tx, err := s.db.Begin()
if err != nil {
return errors.Wrapf(err, "error beginning database transaction")
}
defer func() {
if err != nil {
if err2 := tx.Rollback(); err2 != nil {
logrus.Errorf("Error rolling back transaction to add pod %s: %v", pod.ID(), err2)
}
}
}()
_, err = tx.Exec(query, pod.ID(), pod.Name(), string(labelsJSON))
if err != nil {
return errors.Wrapf(err, "error adding pod %s to database", pod.ID())
}
if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing transaction to add pod %s", pod.ID())
}
return nil
}
// RemovePod removes a pod from the state
// Only empty pods can be removed
func (s *SQLState) RemovePod(pod *Pod) error {
return ErrNotImplemented
const query = "DELETE FROM pods WHERE ID=?;"
if !s.valid {
return ErrDBClosed
}
tx, err := s.db.Begin()
if err != nil {
return errors.Wrapf(err, "error beginning database transaction")
}
defer func() {
if err != nil {
if err2 := tx.Rollback(); err2 != nil {
logrus.Errorf("Error rolling back transaction to remove pod %s: %v", pod.ID(), err2)
}
}
}()
// Check rows acted on for the first transaction, verify we actually removed something
result, err := tx.Exec(query, pod.ID())
if err != nil {
return errors.Wrapf(err, "error removing pod %s from containers table", pod.ID())
}
rows, err := result.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error retrieving number of rows in transaction removing pod %s", pod.ID())
} else if rows == 0 {
return ErrNoSuchPod
}
if err := tx.Commit(); err != nil {
return errors.Wrapf(err, "error committing transaction to remove pod %s", pod.ID())
}
return nil
}
// AddContainerToPod adds a container to the given pod
func (s *SQLState) AddContainerToPod(pod *Pod, ctr *Container) error {
return ErrNotImplemented
if !pod.valid {
return ErrPodRemoved
}
if !ctr.valid {
return ErrCtrRemoved
}
if ctr.config.Pod != pod.ID() {
return errors.Wrapf(ErrInvalidArg, "container's pod ID does not match given pod's ID")
}
return s.addContainer(ctr)
}
// RemoveContainerFromPod removes a container from the given pod
func (s *SQLState) RemoveContainerFromPod(pod *Pod, ctr *Container) error {
return ErrNotImplemented
if ctr.config.Pod != pod.ID() {
return errors.Wrapf(ErrInvalidArg, "container %s is not in pod %s", ctr.ID(), pod.ID())
}
return s.removeContainer(ctr)
}
// AllPods retrieves all pods presently in the state
func (s *SQLState) AllPods() ([]*Pod, error) {
return nil, ErrNotImplemented
const query = "SELECT * FROM pods;"
if !s.valid {
return nil, ErrDBClosed
}
rows, err := s.db.Query(query)
if err != nil {
return nil, errors.Wrapf(err, "error querying database for all pods")
}
defer rows.Close()
pods := []*Pod{}
for rows.Next() {
pod, err := s.podFromScannable(rows)
if err != nil {
return nil, err
}
pods = append(pods, pod)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrapf(err, "error retrieving pod rows")
}
return pods, nil
}