mirror of
https://github.com/containers/podman.git
synced 2025-06-23 10:38:20 +08:00
Merge pull request #14984 from Luap99/logs
fix goroutine leaks in events and logs backend
This commit is contained in:
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/containers/podman/v4/libpod/define"
|
"github.com/containers/podman/v4/libpod/define"
|
||||||
"github.com/containers/podman/v4/libpod/events"
|
"github.com/containers/podman/v4/libpod/events"
|
||||||
"github.com/containers/podman/v4/libpod/logs"
|
"github.com/containers/podman/v4/libpod/logs"
|
||||||
|
"github.com/nxadm/tail"
|
||||||
"github.com/nxadm/tail/watch"
|
"github.com/nxadm/tail/watch"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@ -74,14 +75,19 @@ func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOption
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer options.WaitGroup.Done()
|
defer options.WaitGroup.Done()
|
||||||
|
var line *tail.Line
|
||||||
for line := range t.Lines {
|
var ok bool
|
||||||
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// the consumer has cancelled
|
// the consumer has cancelled
|
||||||
|
t.Kill(errors.New("hangup by client"))
|
||||||
return
|
return
|
||||||
default:
|
case line, ok = <-t.Lines:
|
||||||
// fallthrough
|
if !ok {
|
||||||
|
// channel was closed
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
nll, err := logs.NewLogLine(line.Text)
|
nll, err := logs.NewLogLine(line.Text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -178,8 +178,13 @@ func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOption
|
|||||||
if !options.Follow || !containerCouldBeLogging {
|
if !options.Follow || !containerCouldBeLogging {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Sleep until something's happening on the journal.
|
|
||||||
journal.Wait(sdjournal.IndefiniteWait)
|
// journal.Wait() is blocking, this would cause the goroutine to hang forever
|
||||||
|
// if no more journal entries are generated and thus if the client
|
||||||
|
// has closed the connection in the meantime to leak memory.
|
||||||
|
// Waiting only 5 seconds makes sure we can check if the client closed in the
|
||||||
|
// meantime at least every 5 seconds.
|
||||||
|
journal.Wait(5 * time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
lastReadCursor = cursor
|
lastReadCursor = cursor
|
||||||
|
@ -141,9 +141,18 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
|
|||||||
if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) {
|
if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
t := sdjournal.IndefiniteWait
|
|
||||||
|
// j.Wait() is blocking, this would cause the goroutine to hang forever
|
||||||
|
// if no more journal entries are generated and thus if the client
|
||||||
|
// has closed the connection in the meantime to leak memory.
|
||||||
|
// Waiting only 5 seconds makes sure we can check if the client closed in the
|
||||||
|
// meantime at least every 5 seconds.
|
||||||
|
t := 5 * time.Second
|
||||||
if len(options.Until) > 0 {
|
if len(options.Until) > 0 {
|
||||||
t = time.Until(untilTime)
|
until := time.Until(untilTime)
|
||||||
|
if until < t {
|
||||||
|
t = until
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ = j.Wait(t)
|
_ = j.Wait(t)
|
||||||
continue
|
continue
|
||||||
|
@ -108,23 +108,19 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
funcDone := make(chan bool)
|
var line *tail.Line
|
||||||
copy := true
|
var ok bool
|
||||||
go func() {
|
for {
|
||||||
select {
|
|
||||||
case <-funcDone:
|
|
||||||
// Do nothing
|
|
||||||
case <-ctx.Done():
|
|
||||||
copy = false
|
|
||||||
t.Kill(errors.New("hangup by client"))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for line := range t.Lines {
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// the consumer has cancelled
|
// the consumer has cancelled
|
||||||
|
t.Kill(errors.New("hangup by client"))
|
||||||
return nil
|
return nil
|
||||||
default:
|
case line, ok = <-t.Lines:
|
||||||
|
if !ok {
|
||||||
|
// channel was closed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
// fallthrough
|
// fallthrough
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,12 +134,10 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
|
|||||||
default:
|
default:
|
||||||
return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
|
return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
|
||||||
}
|
}
|
||||||
if copy && applyFilters(event, filterMap) {
|
if applyFilters(event, filterMap) {
|
||||||
options.EventChannel <- event
|
options.EventChannel <- event
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
funcDone <- true
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns a string representation of the logger
|
// String returns a string representation of the logger
|
||||||
|
Reference in New Issue
Block a user