Files
podman/libpod/events/journal_linux.go
Paul Holzinger 4e72aa5860 fix goroutine leaks in events and logs backend
When running a single podman logs this is not really important since we
will exit when we finish reading the logs. However for the system
service this is very important. Leaking goroutines will cause an
increased memory and CPU ussage over time.

Both the the event and log backend have goroutine leaks with both the
file and journald drivers.

The journald backend has the problem that journal.Wait(IndefiniteWait)
will block until we get a new journald event. So when a client closes
the connection the goroutine would still wait until there is a new
journal entry. To fix this we just wait for a maximum of 5 seconds,
after that we can check if the client connection was closed and exit
correctly in this case.

For the file backend we can fix this by waiting for either the log line
or context cancel at the same time. Currently it would block waiting for
new log lines and only check afterwards if the client closed the
connection and thus hang forever if there are no new log lines.

[NO NEW TESTS NEEDED] I am open to ideas how we can test memory leaks in
CI.
To test manually run a container like this:
`podman run --log-driver $driver  --name test -d alpine sh -c 'i=1; while [ "$i" -ne 1000 ]; do echo "line $i"; i=$((i + 1)); done; sleep inf'`
where `$driver` can be either `journald` or `k8s-file`.
Then start the podman system service and use:
`curl -m 1 --output -  --unix-socket $XDG_RUNTIME_DIR/podman/podman.sock -v 'http://d/containers/test/logs?follow=1&since=0&stderr=1&stdout=1' &>/dev/null`
to get the logs from the API and then it closes the connection after 1 second.
Now run the curl command several times and check the memory usage of the service.

Fixes #14879

Signed-off-by: Paul Holzinger <pholzing@redhat.com>
2022-07-20 12:55:34 +02:00

241 lines
6.6 KiB
Go

//go:build systemd
// +build systemd
package events
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/containers/podman/v4/pkg/util"
"github.com/coreos/go-systemd/v22/journal"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/sirupsen/logrus"
)
// DefaultEventerType is journald when systemd is available
const DefaultEventerType = Journald
// EventJournalD is the journald implementation of an eventer
type EventJournalD struct {
options EventerOptions
}
// newEventJournalD creates a new journald Eventer
func newEventJournalD(options EventerOptions) (Eventer, error) {
return EventJournalD{options}, nil
}
// Write to journald
func (e EventJournalD) Write(ee Event) error {
m := make(map[string]string)
m["SYSLOG_IDENTIFIER"] = "podman"
m["PODMAN_EVENT"] = ee.Status.String()
m["PODMAN_TYPE"] = ee.Type.String()
m["PODMAN_TIME"] = ee.Time.Format(time.RFC3339Nano)
// Add specialized information based on the podman type
switch ee.Type {
case Image:
m["PODMAN_NAME"] = ee.Name
m["PODMAN_ID"] = ee.ID
case Container, Pod:
m["PODMAN_IMAGE"] = ee.Image
m["PODMAN_NAME"] = ee.Name
m["PODMAN_ID"] = ee.ID
if ee.ContainerExitCode != 0 {
m["PODMAN_EXIT_CODE"] = strconv.Itoa(ee.ContainerExitCode)
}
// If we have container labels, we need to convert them to a string so they
// can be recorded with the event
if len(ee.Details.Attributes) > 0 {
b, err := json.Marshal(ee.Details.Attributes)
if err != nil {
return err
}
m["PODMAN_LABELS"] = string(b)
}
m["PODMAN_HEALTH_STATUS"] = ee.HealthStatus
case Network:
m["PODMAN_ID"] = ee.ID
m["PODMAN_NETWORK_NAME"] = ee.Network
case Volume:
m["PODMAN_NAME"] = ee.Name
}
return journal.Send(ee.ToHumanReadable(false), journal.PriInfo, m)
}
// Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
}
var untilTime time.Time
if len(options.Until) > 0 {
untilTime, err = util.ParseInputTime(options.Until, false)
if err != nil {
return err
}
}
j, err := sdjournal.NewJournal()
if err != nil {
return err
}
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}()
// match only podman journal entries
podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
if err := j.AddMatch(podmanJournal.String()); err != nil {
return fmt.Errorf("failed to add journal filter for event log: %w", err)
}
if len(options.Since) == 0 && len(options.Until) == 0 && options.Stream {
if err := j.SeekTail(); err != nil {
return fmt.Errorf("failed to seek end of journal: %w", err)
}
// After SeekTail calling Next moves to a random entry.
// To prevent this we have to call Previous first.
// see: https://bugs.freedesktop.org/show_bug.cgi?id=64614
if _, err := j.Previous(); err != nil {
return fmt.Errorf("failed to move journal cursor to previous entry: %w", err)
}
}
// the api requires a next|prev before getting a cursor
if _, err := j.Next(); err != nil {
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
}
prevCursor, err := j.GetCursor()
if err != nil {
return fmt.Errorf("failed to get journal cursor: %w", err)
}
for {
select {
case <-ctx.Done():
// the consumer has cancelled
return nil
default:
// fallthrough
}
if _, err := j.Next(); err != nil {
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
}
newCursor, err := j.GetCursor()
if err != nil {
return fmt.Errorf("failed to get journal cursor: %w", err)
}
if prevCursor == newCursor {
if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) {
break
}
// j.Wait() is blocking, this would cause the goroutine to hang forever
// if no more journal entries are generated and thus if the client
// has closed the connection in the meantime to leak memory.
// Waiting only 5 seconds makes sure we can check if the client closed in the
// meantime at least every 5 seconds.
t := 5 * time.Second
if len(options.Until) > 0 {
until := time.Until(untilTime)
if until < t {
t = until
}
}
_ = j.Wait(t)
continue
}
prevCursor = newCursor
entry, err := j.GetEntry()
if err != nil {
return fmt.Errorf("failed to read journal entry: %w", err)
}
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
logrus.Errorf("Unable to decode event: %v", err)
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- newEvent
}
}
return nil
}
func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
newEvent := Event{}
eventType, err := StringToType(entry.Fields["PODMAN_TYPE"])
if err != nil {
return nil, err
}
eventTime, err := time.Parse(time.RFC3339Nano, entry.Fields["PODMAN_TIME"])
if err != nil {
return nil, err
}
eventStatus, err := StringToStatus(entry.Fields["PODMAN_EVENT"])
if err != nil {
return nil, err
}
newEvent.Type = eventType
newEvent.Time = eventTime
newEvent.Status = eventStatus
newEvent.Name = entry.Fields["PODMAN_NAME"]
switch eventType {
case Container, Pod:
newEvent.ID = entry.Fields["PODMAN_ID"]
newEvent.Image = entry.Fields["PODMAN_IMAGE"]
if code, ok := entry.Fields["PODMAN_EXIT_CODE"]; ok {
intCode, err := strconv.Atoi(code)
if err != nil {
logrus.Errorf("Parsing event exit code %s", code)
} else {
newEvent.ContainerExitCode = intCode
}
}
// we need to check for the presence of labels recorded to a container event
if stringLabels, ok := entry.Fields["PODMAN_LABELS"]; ok && len(stringLabels) > 0 {
labels := make(map[string]string, 0)
if err := json.Unmarshal([]byte(stringLabels), &labels); err != nil {
return nil, err
}
// if we have labels, add them to the event
if len(labels) > 0 {
newEvent.Details = Details{Attributes: labels}
}
}
newEvent.HealthStatus = entry.Fields["PODMAN_HEALTH_STATUS"]
case Network:
newEvent.ID = entry.Fields["PODMAN_ID"]
newEvent.Network = entry.Fields["PODMAN_NETWORK_NAME"]
case Image:
newEvent.ID = entry.Fields["PODMAN_ID"]
}
return &newEvent, nil
}
// String returns a string representation of the logger
func (e EventJournalD) String() string {
return Journald.String()
}