Add tracking for container exec sessions to DB

Signed-off-by: Matthew Heon <matthew.heon@gmail.com>

Closes: #412
Approved by: baude
This commit is contained in:
Matthew Heon
2018-02-23 15:28:56 -05:00
committed by Atomic Bot
parent 920b66707e
commit 2a0c949b9b
6 changed files with 217 additions and 12 deletions

View File

@ -140,6 +140,9 @@ func (s *BoltState) Refresh() error {
state.Mountpoint = "" state.Mountpoint = ""
state.Mounted = false state.Mounted = false
state.State = ContainerStateConfigured state.State = ContainerStateConfigured
state.IPAddress = ""
state.SubnetMask = ""
state.ExecSessions = make(map[string]int)
newStateBytes, err := json.Marshal(state) newStateBytes, err := json.Marshal(state)
if err != nil { if err != nil {

View File

@ -141,6 +141,9 @@ type containerState struct {
IPAddress string `json:"ipAddress"` IPAddress string `json:"ipAddress"`
// Subnet mask of container (if network namespace was created) // Subnet mask of container (if network namespace was created)
SubnetMask string `json:"subnetMask"` SubnetMask string `json:"subnetMask"`
// ExecSessions contains active exec sessions for container
// Exec session ID is mapped to PID of exec process
ExecSessions map[string]int `json:"execSessions,omitempty"`
} }
// ContainerConfig contains all information that was used to create the // ContainerConfig contains all information that was used to create the
@ -574,7 +577,8 @@ func (c *Container) OOMKilled() (bool, error) {
} }
// PID returns the PID of the container // PID returns the PID of the container
// An error is returned if the container is not running // If the container is not running, a pid of 0 will be returned. No error will
// occur.
func (c *Container) PID() (int, error) { func (c *Container) PID() (int, error) {
if !c.locked { if !c.locked {
c.lock.Lock() c.lock.Lock()
@ -588,6 +592,26 @@ func (c *Container) PID() (int, error) {
return c.state.PID, nil return c.state.PID, nil
} }
// ExecSessions retrieves active exec sessions running in the container
// The result is a map from session ID to the PID of the exec process
func (c *Container) ExecSessions() (map[string]int, error) {
if !c.locked {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.syncContainer(); err != nil {
return nil, err
}
}
returnMap := make(map[string]int, len(c.state.ExecSessions))
for k, v := range c.state.ExecSessions {
returnMap[k] = v
}
return returnMap, nil
}
// Misc Accessors // Misc Accessors
// Most will require locking // Most will require locking

View File

@ -2439,7 +2439,7 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error {
var obj []byte var obj []byte
_ = obj _ = obj
_ = err _ = err
buf.WriteString(`{"state":`) buf.WriteString(`{ "state":`)
fflib.FormatBits2(buf, uint64(j.State), 10, j.State < 0) fflib.FormatBits2(buf, uint64(j.State), 10, j.State < 0)
buf.WriteByte(',') buf.WriteByte(',')
if len(j.ConfigPath) != 0 { if len(j.ConfigPath) != 0 {
@ -2515,6 +2515,24 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error {
fflib.WriteJsonString(buf, string(j.IPAddress)) fflib.WriteJsonString(buf, string(j.IPAddress))
buf.WriteString(`,"subnetMask":`) buf.WriteString(`,"subnetMask":`)
fflib.WriteJsonString(buf, string(j.SubnetMask)) fflib.WriteJsonString(buf, string(j.SubnetMask))
buf.WriteByte(',')
if len(j.ExecSessions) != 0 {
if j.ExecSessions == nil {
buf.WriteString(`"execSessions":null`)
} else {
buf.WriteString(`"execSessions":{ `)
for key, value := range j.ExecSessions {
fflib.WriteJsonString(buf, key)
buf.WriteString(`:`)
fflib.FormatBits2(buf, uint64(value), 10, value < 0)
buf.WriteByte(',')
}
buf.Rewind(1)
buf.WriteByte('}')
}
buf.WriteByte(',')
}
buf.Rewind(1)
buf.WriteByte('}') buf.WriteByte('}')
return nil return nil
} }
@ -2546,6 +2564,8 @@ const (
ffjtcontainerStateIPAddress ffjtcontainerStateIPAddress
ffjtcontainerStateSubnetMask ffjtcontainerStateSubnetMask
ffjtcontainerStateExecSessions
) )
var ffjKeycontainerStateState = []byte("state") var ffjKeycontainerStateState = []byte("state")
@ -2572,6 +2592,8 @@ var ffjKeycontainerStateIPAddress = []byte("ipAddress")
var ffjKeycontainerStateSubnetMask = []byte("subnetMask") var ffjKeycontainerStateSubnetMask = []byte("subnetMask")
var ffjKeycontainerStateExecSessions = []byte("execSessions")
// UnmarshalJSON umarshall json - template of ffjson // UnmarshalJSON umarshall json - template of ffjson
func (j *containerState) UnmarshalJSON(input []byte) error { func (j *containerState) UnmarshalJSON(input []byte) error {
fs := fflib.NewFFLexer(input) fs := fflib.NewFFLexer(input)
@ -2647,6 +2669,11 @@ mainparse:
currentKey = ffjtcontainerStateExitCode currentKey = ffjtcontainerStateExitCode
state = fflib.FFParse_want_colon state = fflib.FFParse_want_colon
goto mainparse goto mainparse
} else if bytes.Equal(ffjKeycontainerStateExecSessions, kn) {
currentKey = ffjtcontainerStateExecSessions
state = fflib.FFParse_want_colon
goto mainparse
} }
case 'f': case 'f':
@ -2722,6 +2749,12 @@ mainparse:
} }
if fflib.EqualFoldRight(ffjKeycontainerStateExecSessions, kn) {
currentKey = ffjtcontainerStateExecSessions
state = fflib.FFParse_want_colon
goto mainparse
}
if fflib.EqualFoldRight(ffjKeycontainerStateSubnetMask, kn) { if fflib.EqualFoldRight(ffjKeycontainerStateSubnetMask, kn) {
currentKey = ffjtcontainerStateSubnetMask currentKey = ffjtcontainerStateSubnetMask
state = fflib.FFParse_want_colon state = fflib.FFParse_want_colon
@ -2847,6 +2880,9 @@ mainparse:
case ffjtcontainerStateSubnetMask: case ffjtcontainerStateSubnetMask:
goto handle_SubnetMask goto handle_SubnetMask
case ffjtcontainerStateExecSessions:
goto handle_ExecSessions
case ffjtcontainerStatenosuchkey: case ffjtcontainerStatenosuchkey:
err = fs.SkipField(tok) err = fs.SkipField(tok)
if err != nil { if err != nil {
@ -3201,6 +3237,115 @@ handle_SubnetMask:
state = fflib.FFParse_after_value state = fflib.FFParse_after_value
goto mainparse goto mainparse
handle_ExecSessions:
/* handler: j.ExecSessions type=map[string]int kind=map quoted=false*/
{
{
if tok != fflib.FFTok_left_bracket && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for ", tok))
}
}
if tok == fflib.FFTok_null {
j.ExecSessions = nil
} else {
j.ExecSessions = make(map[string]int, 0)
wantVal := true
for {
var k string
var tmpJExecSessions int
tok = fs.Scan()
if tok == fflib.FFTok_error {
goto tokerror
}
if tok == fflib.FFTok_right_bracket {
break
}
if tok == fflib.FFTok_comma {
if wantVal == true {
// TODO(pquerna): this isn't an ideal error message, this handles
// things like [,,,] as an array value.
return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok))
}
continue
} else {
wantVal = true
}
/* handler: k type=string kind=string quoted=false*/
{
{
if tok != fflib.FFTok_string && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok))
}
}
if tok == fflib.FFTok_null {
} else {
outBuf := fs.Output.Bytes()
k = string(string(outBuf))
}
}
// Expect ':' after key
tok = fs.Scan()
if tok != fflib.FFTok_colon {
return fs.WrapErr(fmt.Errorf("wanted colon token, but got token: %v", tok))
}
tok = fs.Scan()
/* handler: tmpJExecSessions type=int kind=int quoted=false*/
{
if tok != fflib.FFTok_integer && tok != fflib.FFTok_null {
return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for int", tok))
}
}
{
if tok == fflib.FFTok_null {
} else {
tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 64)
if err != nil {
return fs.WrapErr(err)
}
tmpJExecSessions = int(tval)
}
}
j.ExecSessions[k] = tmpJExecSessions
wantVal = false
}
}
}
state = fflib.FFParse_after_value
goto mainparse
wantedvalue: wantedvalue:
return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok)) return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok))
wrongtokenerror: wrongtokenerror:

View File

@ -14,7 +14,7 @@ import (
// DBSchema is the current DB schema version // DBSchema is the current DB schema version
// Increments every time a change is made to the database's tables // Increments every time a change is made to the database's tables
const DBSchema = 11 const DBSchema = 12
// SQLState is a state implementation backed by a persistent SQLite3 database // SQLState is a state implementation backed by a persistent SQLite3 database
type SQLState struct { type SQLState struct {
@ -102,7 +102,8 @@ func (s *SQLState) Refresh() (err error) {
Pid=?, Pid=?,
NetNSPath=?, NetNSPath=?,
IPAddress=?, IPAddress=?,
SubnetMask=?;` SubnetMask=?,
ExecSessions=?;`
if !s.valid { if !s.valid {
return ErrDBClosed return ErrDBClosed
@ -132,7 +133,8 @@ func (s *SQLState) Refresh() (err error) {
0, 0,
"", "",
"", "",
"") "",
"{}")
if err != nil { if err != nil {
return errors.Wrapf(err, "error refreshing database state") return errors.Wrapf(err, "error refreshing database state")
} }
@ -269,7 +271,8 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
Pid, Pid,
NetNSPath, NetNSPath,
IPAddress, IPAddress,
SubnetMask SubnetMask,
ExecSessions
FROM containerState WHERE ID=?;` FROM containerState WHERE ID=?;`
var ( var (
@ -285,6 +288,7 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
netNSPath string netNSPath string
ipAddress string ipAddress string
subnetMask string subnetMask string
execSessions string
) )
if !s.valid { if !s.valid {
@ -308,7 +312,8 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
&pid, &pid,
&netNSPath, &netNSPath,
&ipAddress, &ipAddress,
&subnetMask) &subnetMask,
&execSessions)
if err != nil { if err != nil {
// The container may not exist in the database // The container may not exist in the database
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -333,6 +338,11 @@ func (s *SQLState) UpdateContainer(ctr *Container) error {
newState.IPAddress = ipAddress newState.IPAddress = ipAddress
newState.SubnetMask = subnetMask newState.SubnetMask = subnetMask
newState.ExecSessions = make(map[string]int)
if err := json.Unmarshal([]byte(execSessions), &newState.ExecSessions); err != nil {
return errors.Wrapf(err, "error parsing container %s exec sessions", ctr.ID())
}
if newState.Mountpoint != "" { if newState.Mountpoint != "" {
newState.Mounted = true newState.Mounted = true
} }
@ -395,13 +405,19 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) {
Pid=?, Pid=?,
NetNSPath=?, NetNSPath=?,
IPAddress=?, IPAddress=?,
SubnetMask=? SubnetMask=?,
ExecSessions=?
WHERE Id=?;` WHERE Id=?;`
if !ctr.valid { if !ctr.valid {
return ErrCtrRemoved return ErrCtrRemoved
} }
execSessionsJSON, err := json.Marshal(ctr.state.ExecSessions)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s exec sessions", ctr.ID())
}
netNSPath := "" netNSPath := ""
if ctr.state.NetNS != nil { if ctr.state.NetNS != nil {
netNSPath = ctr.state.NetNS.Path() netNSPath = ctr.state.NetNS.Path()
@ -439,6 +455,7 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) {
netNSPath, netNSPath,
ctr.state.IPAddress, ctr.state.IPAddress,
ctr.state.SubnetMask, ctr.state.SubnetMask,
execSessionsJSON,
ctr.ID()) ctr.ID())
if err != nil { if err != nil {
return errors.Wrapf(err, "error updating container %s state in database", ctr.ID()) return errors.Wrapf(err, "error updating container %s state in database", ctr.ID())

View File

@ -33,7 +33,8 @@ const (
containerState.Pid, containerState.Pid,
containerState.NetNSPath, containerState.NetNSPath,
containerState.IPAddress, containerState.IPAddress,
containerState.SubnetMask containerState.SubnetMask,
containerState.ExecSessions
FROM containers FROM containers
INNER JOIN INNER JOIN
containerState ON containers.Id = containerState.Id ` containerState ON containers.Id = containerState.Id `
@ -274,6 +275,7 @@ func prepareDB(db *sql.DB) (err error) {
NetNSPath TEXT NOT NULL, NetNSPath TEXT NOT NULL,
IPAddress TEXT NOT NULL, IPAddress TEXT NOT NULL,
SubnetMask TEXT NOT NULL, SubnetMask TEXT NOT NULL,
ExecSessions TEXT NOT NULL,
CHECK (State>0), CHECK (State>0),
CHECK (OomKilled IN (0, 1)), CHECK (OomKilled IN (0, 1)),
@ -483,6 +485,7 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
netNSPath string netNSPath string
ipAddress string ipAddress string
subnetMask string subnetMask string
execSessions string
) )
err := row.Scan( err := row.Scan(
@ -536,7 +539,8 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
&pid, &pid,
&netNSPath, &netNSPath,
&ipAddress, &ipAddress,
&subnetMask) &subnetMask,
&execSessions)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return nil, ErrNoSuchCtr return nil, ErrNoSuchCtr
@ -616,6 +620,11 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) {
return nil, errors.Wrapf(err, "error parsing container %s DNS server JSON", id) return nil, errors.Wrapf(err, "error parsing container %s DNS server JSON", id)
} }
ctr.state.ExecSessions = make(map[string]int)
if err := json.Unmarshal([]byte(execSessions), &ctr.state.ExecSessions); err != nil {
return nil, errors.Wrapf(err, "error parsing container %s exec sessions JSON", id)
}
labels := make(map[string]string) labels := make(map[string]string)
if err := json.Unmarshal([]byte(labelsJSON), &labels); err != nil { if err := json.Unmarshal([]byte(labelsJSON), &labels); err != nil {
return nil, errors.Wrapf(err, "error parsing container %s labels JSON", id) return nil, errors.Wrapf(err, "error parsing container %s labels JSON", id)
@ -753,7 +762,7 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) {
addCtrState = `INSERT INTO containerState VALUES ( addCtrState = `INSERT INTO containerState VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ? ?, ?, ?, ?
);` );`
addRegistry = "INSERT INTO registry VALUES (?, ?);" addRegistry = "INSERT INTO registry VALUES (?, ?);"
checkCtrInPod = "SELECT 1 FROM containers WHERE Id=? AND Pod=?;" checkCtrInPod = "SELECT 1 FROM containers WHERE Id=? AND Pod=?;"
@ -795,6 +804,11 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) {
return errors.Wrapf(err, "error marshaling container %s labels to JSON", ctr.ID()) return errors.Wrapf(err, "error marshaling container %s labels to JSON", ctr.ID())
} }
execSessionsJSON, err := json.Marshal(ctr.state.ExecSessions)
if err != nil {
return errors.Wrapf(err, "error marshalling container %s exec sessions to JSON", ctr.ID())
}
netNSPath := "" netNSPath := ""
if ctr.state.NetNS != nil { if ctr.state.NetNS != nil {
netNSPath = ctr.state.NetNS.Path() netNSPath = ctr.state.NetNS.Path()
@ -918,7 +932,8 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) {
ctr.state.PID, ctr.state.PID,
netNSPath, netNSPath,
ctr.state.IPAddress, ctr.state.IPAddress,
ctr.state.SubnetMask) ctr.state.SubnetMask,
execSessionsJSON)
if err != nil { if err != nil {
return errors.Wrapf(err, "error adding container %s state to database", ctr.ID()) return errors.Wrapf(err, "error adding container %s state to database", ctr.ID())
} }

View File

@ -55,6 +55,7 @@ func getTestContainer(id, name, locksDir string) (*Container, error) {
Mounted: true, Mounted: true,
Mountpoint: "/does/not/exist/tmp/" + id, Mountpoint: "/does/not/exist/tmp/" + id,
PID: 1234, PID: 1234,
ExecSessions: map[string]int{"abcd": 101, "ef01": 202},
}, },
valid: true, valid: true,
} }