Files
podman/libpod/container_log_linux.go
Valentin Rothberg d32863bbb4 podman image tree: restore previous behavior
The initial version of libimage changed the order of layers which has
now been restored to remain backwards compatible.

Further changes:

 * Fix a bug in the journald logging which requires to strip trailing
   new lines from the message.  The system tests did not pass due to
   empty new lines.  Triggered by changing the default logger to
   journald in containers/common.

 * Fix another bug in the journald logging which embedded the container
   ID inside the message rather than the specifid field.  That surfaced
   in a preceeding whitespace of each log line which broke the system
   tests.

 * Alter the system tests to make sure that the k8s-file and the
   journald logging drivers are executed.

 * A number of e2e tests have been changed to force the k8s-file driver
   to make them pass when running inside a root container.

 * Increase the timeout in a kill test which seems to take longer now.
   Reasons are unknown.  Tests passed earlier and no signal-related
   changes happend.  It may be CI VM flake since some system tests but
   other flaked.

Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
2021-05-12 17:56:59 +02:00

229 lines
5.5 KiB
Go

//+build linux
//+build systemd
package libpod
import (
"context"
"fmt"
"io"
"math"
"strings"
"time"
"github.com/containers/podman/v3/libpod/define"
"github.com/containers/podman/v3/libpod/logs"
journal "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
// journaldLogOut is the journald priority signifying stdout
journaldLogOut = "6"
// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"
// bufLen is the length of the buffer to read from a k8s-file
// formatted log line
// let's set it as 2k just to be safe if k8s-file format ever changes
bufLen = 16384
)
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = 0
} else if options.Tail == 0 {
config.NumFromTail = math.MaxUint64
} else {
config.NumFromTail = uint64(options.Tail)
}
if options.Multi {
config.Formatter = journalFormatterWithID
} else {
config.Formatter = journalFormatter
}
defaultTime := time.Time{}
if options.Since != defaultTime {
// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
// return nothing instead of falsely printing
if time.Now().Before(options.Since) {
return nil
}
// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
config.Since = -time.Since(options.Since)
}
config.Matches = append(config.Matches, journal.Match{
Field: "CONTAINER_ID_FULL",
Value: c.ID(),
})
options.WaitGroup.Add(1)
r, err := journal.NewJournalReader(config)
if err != nil {
return err
}
if r == nil {
return errors.Errorf("journal reader creation failed")
}
if options.Tail == math.MaxInt64 {
r.Rewind()
}
state, err := c.State()
if err != nil {
return err
}
if options.Follow && state == define.ContainerStateRunning {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
}
}()
go func() {
// FIXME (#10323): we are facing a terrible
// race condition here. At the time the
// container dies and `c.Wait()` has returned,
// we may not have received all journald logs.
// So far there is no other way than waiting
// for a second. Ultimately, `r.Follow` is
// racy and we may have to implement our custom
// logic here.
c.Wait(ctx)
time.Sleep(time.Second)
until <- time.Time{}
}()
follower := journaldFollowBuffer{logChannel, options.Multi}
err := r.Follow(until, follower)
if err != nil {
logrus.Debugf(err.Error())
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil
}
go func() {
bytes := make([]byte, bufLen)
// /me complains about no do-while in go
ec, err := r.Read(bytes)
for ec != 0 && err == nil {
// because we are reusing bytes, we need to make
// sure the old data doesn't get into the new line
bytestr := string(bytes[:ec])
logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
if err2 != nil {
logrus.Error(err2)
continue
}
logChannel <- logLine
ec, err = r.Read(bytes)
}
if err != nil && err != io.EOF {
logrus.Error(err)
}
r.Close()
options.WaitGroup.Done()
}()
return nil
}
func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
}
id, ok := entry.Fields["CONTAINER_ID_FULL"]
if !ok {
return "", fmt.Errorf("no CONTAINER_ID_FULL field present in journal entry")
}
if len(id) > 12 {
id = id[:12]
}
output += fmt.Sprintf("%s ", id)
// Append message
msg, err := formatterMessage(entry)
if err != nil {
return "", err
}
output += msg
return output, nil
}
func journalFormatter(entry *journal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
}
// Append message
msg, err := formatterMessage(entry)
if err != nil {
return "", err
}
output += msg
return output, nil
}
func formatterPrefix(entry *journal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
priority, ok := entry.Fields["PRIORITY"]
if !ok {
return "", errors.Errorf("no PRIORITY field present in journal entry")
}
if priority == journaldLogOut {
output += "stdout "
} else if priority == journaldLogErr {
output += "stderr "
} else {
return "", errors.Errorf("unexpected PRIORITY field in journal entry")
}
// if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P"
if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok {
output += fmt.Sprintf("%s ", logs.PartialLogType)
} else {
output += fmt.Sprintf("%s ", logs.FullLogType)
}
return output, nil
}
func formatterMessage(entry *journal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
return "", fmt.Errorf("no MESSAGE field present in journal entry")
}
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}
type journaldFollowBuffer struct {
logChannel chan *logs.LogLine
withID bool
}
func (f journaldFollowBuffer) Write(p []byte) (int, error) {
bytestr := string(p)
logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
if err != nil {
return -1, err
}
f.logChannel <- logLine
return len(p), nil
}