mirror of
https://github.com/containers/podman.git
synced 2025-07-15 03:02:52 +08:00

Fix a race in journald driver. Following the logs implies streaming until the container is dead. Streaming happened in one goroutine, waiting for the container to exit/die and signaling that event happened in another goroutine. The nature of having two goroutines running simultaneously is pretty much the core of the race condition. When the streaming goroutines received the signal that the container has exitted, the routine may not have read and written all of the container's logs. Fix this race by reading both, the logs and the events, of the container and stop streaming when the died/exited event has been read. The died event is guaranteed to be after all logs in the journal which guarantees not only consistencty but also a deterministic behavior. Note that the journald log driver now requires the journald event backend to be set. Fixes: #10323 Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
277 lines
7.2 KiB
Go
277 lines
7.2 KiB
Go
//+build linux
|
|
//+build systemd
|
|
|
|
package libpod
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containers/podman/v3/libpod/events"
|
|
"github.com/containers/podman/v3/libpod/logs"
|
|
"github.com/coreos/go-systemd/v22/sdjournal"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
// journaldLogOut is the journald priority signifying stdout
|
|
journaldLogOut = "6"
|
|
|
|
// journaldLogErr is the journald priority signifying stderr
|
|
journaldLogErr = "3"
|
|
)
|
|
|
|
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
|
journal, err := sdjournal.NewJournal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// While logs are written to the `logChannel`, we inspect each event
|
|
// and stop once the container has died. Having logs and events in one
|
|
// stream prevents a race condition that we faced in #10323.
|
|
|
|
// Add the filters for events.
|
|
match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
|
|
if err := journal.AddMatch(match.String()); err != nil {
|
|
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
|
|
}
|
|
match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
|
|
if err := journal.AddMatch(match.String()); err != nil {
|
|
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
|
|
}
|
|
|
|
// Add the filter for logs. Note the disjunction so that we match
|
|
// either the events or the logs.
|
|
if err := journal.AddDisjunction(); err != nil {
|
|
return errors.Wrap(err, "adding filter disjunction to journald logger")
|
|
}
|
|
match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
|
|
if err := journal.AddMatch(match.String()); err != nil {
|
|
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
|
|
}
|
|
|
|
if err := journal.SeekHead(); err != nil {
|
|
return err
|
|
}
|
|
// API requires Next() immediately after SeekHead().
|
|
if _, err := journal.Next(); err != nil {
|
|
return errors.Wrap(err, "initial journal cursor")
|
|
}
|
|
|
|
// API requires a next|prev before getting a cursor.
|
|
if _, err := journal.Previous(); err != nil {
|
|
return errors.Wrap(err, "initial journal cursor")
|
|
}
|
|
|
|
// Note that the initial cursor may not yet be ready, so we'll do an
|
|
// exponential backoff.
|
|
var cursor string
|
|
var cursorError error
|
|
for i := 1; i <= 3; i++ {
|
|
cursor, cursorError = journal.GetCursor()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
time.Sleep(time.Duration(i*100) * time.Millisecond)
|
|
break
|
|
}
|
|
if cursorError != nil {
|
|
return errors.Wrap(cursorError, "inital journal cursor")
|
|
}
|
|
|
|
// We need the container's events in the same journal to guarantee
|
|
// consistency, see #10323.
|
|
if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
|
|
return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
|
|
}
|
|
|
|
options.WaitGroup.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
options.WaitGroup.Done()
|
|
if err := journal.Close(); err != nil {
|
|
logrus.Errorf("Unable to close journal: %v", err)
|
|
}
|
|
}()
|
|
|
|
afterTimeStamp := false // needed for options.Since
|
|
tailQueue := []*logs.LogLine{} // needed for options.Tail
|
|
doTail := options.Tail > 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// Remote client may have closed/lost the connection.
|
|
return
|
|
default:
|
|
// Fallthrough
|
|
}
|
|
|
|
if _, err := journal.Next(); err != nil {
|
|
logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
|
|
return
|
|
}
|
|
latestCursor, err := journal.GetCursor()
|
|
if err != nil {
|
|
logrus.Errorf("Failed to get journal cursor: %v", err)
|
|
return
|
|
}
|
|
|
|
// Hit the end of the journal.
|
|
if cursor == latestCursor {
|
|
if doTail {
|
|
// Flush *once* we hit the end of the journal.
|
|
startIndex := int64(len(tailQueue)-1) - options.Tail
|
|
if startIndex < 0 {
|
|
startIndex = 0
|
|
}
|
|
for i := startIndex; i < int64(len(tailQueue)); i++ {
|
|
logChannel <- tailQueue[i]
|
|
}
|
|
tailQueue = nil
|
|
doTail = false
|
|
}
|
|
// Unless we follow, quit.
|
|
if !options.Follow {
|
|
return
|
|
}
|
|
// Sleep until something's happening on the journal.
|
|
journal.Wait(sdjournal.IndefiniteWait)
|
|
continue
|
|
}
|
|
cursor = latestCursor
|
|
|
|
entry, err := journal.GetEntry()
|
|
if err != nil {
|
|
logrus.Errorf("Failed to get journal entry: %v", err)
|
|
return
|
|
}
|
|
|
|
if !afterTimeStamp {
|
|
entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
|
|
if entryTime.Before(options.Since) {
|
|
continue
|
|
}
|
|
afterTimeStamp = true
|
|
}
|
|
|
|
// If we're reading an event and the container exited/died,
|
|
// then we're done and can return.
|
|
event, ok := entry.Fields["PODMAN_EVENT"]
|
|
if ok {
|
|
status, err := events.StringToStatus(event)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to translate event: %v", err)
|
|
return
|
|
}
|
|
if status == events.Exited {
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
var message string
|
|
var formatError error
|
|
|
|
if options.Multi {
|
|
message, formatError = journalFormatterWithID(entry)
|
|
} else {
|
|
message, formatError = journalFormatter(entry)
|
|
}
|
|
|
|
if formatError != nil {
|
|
logrus.Errorf("Failed to parse journald log entry: %v", err)
|
|
return
|
|
}
|
|
|
|
logLine, err := logs.NewJournaldLogLine(message, options.Multi)
|
|
if err != nil {
|
|
logrus.Errorf("Failed parse log line: %v", err)
|
|
return
|
|
}
|
|
if doTail {
|
|
tailQueue = append(tailQueue, logLine)
|
|
continue
|
|
}
|
|
logChannel <- logLine
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
|
|
output, err := formatterPrefix(entry)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
id, ok := entry.Fields["CONTAINER_ID_FULL"]
|
|
if !ok {
|
|
return "", fmt.Errorf("no CONTAINER_ID_FULL field present in journal entry")
|
|
}
|
|
if len(id) > 12 {
|
|
id = id[:12]
|
|
}
|
|
output += fmt.Sprintf("%s ", id)
|
|
// Append message
|
|
msg, err := formatterMessage(entry)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
output += msg
|
|
return output, nil
|
|
}
|
|
|
|
func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
|
|
output, err := formatterPrefix(entry)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
// Append message
|
|
msg, err := formatterMessage(entry)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
output += msg
|
|
return output, nil
|
|
}
|
|
|
|
func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
|
|
usec := entry.RealtimeTimestamp
|
|
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
|
|
output := fmt.Sprintf("%s ", tsString)
|
|
priority, ok := entry.Fields["PRIORITY"]
|
|
if !ok {
|
|
return "", errors.Errorf("no PRIORITY field present in journal entry")
|
|
}
|
|
if priority == journaldLogOut {
|
|
output += "stdout "
|
|
} else if priority == journaldLogErr {
|
|
output += "stderr "
|
|
} else {
|
|
return "", errors.Errorf("unexpected PRIORITY field in journal entry")
|
|
}
|
|
|
|
// if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P"
|
|
if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok {
|
|
output += fmt.Sprintf("%s ", logs.PartialLogType)
|
|
} else {
|
|
output += fmt.Sprintf("%s ", logs.FullLogType)
|
|
}
|
|
|
|
return output, nil
|
|
}
|
|
|
|
func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
|
|
// Finally, append the message
|
|
msg, ok := entry.Fields["MESSAGE"]
|
|
if !ok {
|
|
return "", fmt.Errorf("no MESSAGE field present in journal entry")
|
|
}
|
|
msg = strings.TrimSuffix(msg, "\n")
|
|
return msg, nil
|
|
}
|