mirror of
https://github.com/containers/podman.git
synced 2025-07-02 16:57:24 +08:00
log API: add context to allow for cancelling
Add a `context.Context` to the log APIs to allow for cancelling streaming (e.g., via `podman logs -f`). This fixes issues for the remote API where some go routines of the server will continue writing and produce nothing but heat and waste CPU cycles. Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
This commit is contained in:
@ -353,7 +353,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
|
|||||||
logOpts.WaitGroup.Wait()
|
logOpts.WaitGroup.Wait()
|
||||||
close(logChan)
|
close(logChan)
|
||||||
}()
|
}()
|
||||||
if err := c.ReadLog(logOpts, logChan); err != nil {
|
if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
|
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@ -13,9 +14,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Log is a runtime function that can read one or more container logs.
|
// Log is a runtime function that can read one or more container logs.
|
||||||
func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
func (r *Runtime) Log(ctx context.Context, containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
||||||
for _, ctr := range containers {
|
for _, ctr := range containers {
|
||||||
if err := ctr.ReadLog(options, logChannel); err != nil {
|
if err := ctr.ReadLog(ctx, options, logChannel); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -23,25 +24,25 @@ func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChan
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadLog reads a containers log based on the input options and returns loglines over a channel.
|
// ReadLog reads a containers log based on the input options and returns loglines over a channel.
|
||||||
func (c *Container) ReadLog(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
func (c *Container) ReadLog(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
||||||
switch c.LogDriver() {
|
switch c.LogDriver() {
|
||||||
case define.NoLogging:
|
case define.NoLogging:
|
||||||
return errors.Wrapf(define.ErrNoLogs, "this container is using the 'none' log driver, cannot read logs")
|
return errors.Wrapf(define.ErrNoLogs, "this container is using the 'none' log driver, cannot read logs")
|
||||||
case define.JournaldLogging:
|
case define.JournaldLogging:
|
||||||
// TODO Skip sending logs until journald logs can be read
|
// TODO Skip sending logs until journald logs can be read
|
||||||
return c.readFromJournal(options, logChannel)
|
return c.readFromJournal(ctx, options, logChannel)
|
||||||
case define.JSONLogging:
|
case define.JSONLogging:
|
||||||
// TODO provide a separate implementation of this when Conmon
|
// TODO provide a separate implementation of this when Conmon
|
||||||
// has support.
|
// has support.
|
||||||
fallthrough
|
fallthrough
|
||||||
case define.KubernetesLogging, "":
|
case define.KubernetesLogging, "":
|
||||||
return c.readFromLogFile(options, logChannel)
|
return c.readFromLogFile(ctx, options, logChannel)
|
||||||
default:
|
default:
|
||||||
return errors.Wrapf(define.ErrInternal, "unrecognized log driver %q, cannot read logs", c.LogDriver())
|
return errors.Wrapf(define.ErrInternal, "unrecognized log driver %q, cannot read logs", c.LogDriver())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
||||||
t, tailLog, err := logs.GetLogFile(c.LogPath(), options)
|
t, tailLog, err := logs.GetLogFile(c.LogPath(), options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If the log file does not exist, this is not fatal.
|
// If the log file does not exist, this is not fatal.
|
||||||
@ -62,8 +63,17 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer options.WaitGroup.Done()
|
||||||
|
|
||||||
var partial string
|
var partial string
|
||||||
for line := range t.Lines {
|
for line := range t.Lines {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// the consumer has cancelled
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// fallthrough
|
||||||
|
}
|
||||||
nll, err := logs.NewLogLine(line.Text)
|
nll, err := logs.NewLogLine(line.Text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
@ -82,7 +92,6 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
|
|||||||
logChannel <- nll
|
logChannel <- nll
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
options.WaitGroup.Done()
|
|
||||||
}()
|
}()
|
||||||
// Check if container is still running or paused
|
// Check if container is still running or paused
|
||||||
if options.Follow {
|
if options.Follow {
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@ -29,7 +30,7 @@ const (
|
|||||||
bufLen = 16384
|
bufLen = 16384
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
||||||
var config journal.JournalReaderConfig
|
var config journal.JournalReaderConfig
|
||||||
if options.Tail < 0 {
|
if options.Tail < 0 {
|
||||||
config.NumFromTail = math.MaxUint64
|
config.NumFromTail = math.MaxUint64
|
||||||
@ -65,13 +66,24 @@ func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *l
|
|||||||
|
|
||||||
if options.Follow {
|
if options.Follow {
|
||||||
go func() {
|
go func() {
|
||||||
|
done := make(chan bool)
|
||||||
|
until := make(chan time.Time)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
until <- time.Time{}
|
||||||
|
case <-done:
|
||||||
|
// nothing to do anymore
|
||||||
|
}
|
||||||
|
}()
|
||||||
follower := FollowBuffer{logChannel}
|
follower := FollowBuffer{logChannel}
|
||||||
err := r.Follow(nil, follower)
|
err := r.Follow(until, follower)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Debugf(err.Error())
|
logrus.Debugf(err.Error())
|
||||||
}
|
}
|
||||||
r.Close()
|
r.Close()
|
||||||
options.WaitGroup.Done()
|
options.WaitGroup.Done()
|
||||||
|
done <- true
|
||||||
return
|
return
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
|
@ -3,11 +3,13 @@
|
|||||||
package libpod
|
package libpod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/containers/libpod/v2/libpod/define"
|
"github.com/containers/libpod/v2/libpod/define"
|
||||||
"github.com/containers/libpod/v2/libpod/logs"
|
"github.com/containers/libpod/v2/libpod/logs"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
|
func (c *Container) readFromJournal(_ context.Context, _ *logs.LogOptions, _ chan *logs.LogLine) error {
|
||||||
return errors.Wrapf(define.ErrOSNotSupported, "Journald logging only enabled with systemd on linux")
|
return errors.Wrapf(define.ErrOSNotSupported, "Journald logging only enabled with systemd on linux")
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
|
|||||||
options.WaitGroup = &wg
|
options.WaitGroup = &wg
|
||||||
|
|
||||||
logChannel := make(chan *logs.LogLine, tail+1)
|
logChannel := make(chan *logs.LogLine, tail+1)
|
||||||
if err := runtime.Log([]*libpod.Container{ctnr}, options, logChannel); err != nil {
|
if err := runtime.Log(r.Context(), []*libpod.Container{ctnr}, options, logChannel); err != nil {
|
||||||
utils.InternalServerError(w, errors.Wrapf(err, "Failed to obtain logs for Container '%s'", name))
|
utils.InternalServerError(w, errors.Wrapf(err, "Failed to obtain logs for Container '%s'", name))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -105,50 +105,48 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
var frame strings.Builder
|
var frame strings.Builder
|
||||||
header := make([]byte, 8)
|
header := make([]byte, 8)
|
||||||
for ok := true; ok; ok = query.Follow {
|
for line := range logChannel {
|
||||||
for line := range logChannel {
|
if _, found := r.URL.Query()["until"]; found {
|
||||||
if _, found := r.URL.Query()["until"]; found {
|
if line.Time.After(until) {
|
||||||
if line.Time.After(until) {
|
break
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reset buffer we're ready to loop again
|
// Reset buffer we're ready to loop again
|
||||||
frame.Reset()
|
frame.Reset()
|
||||||
switch line.Device {
|
switch line.Device {
|
||||||
case "stdout":
|
case "stdout":
|
||||||
if !query.Stdout {
|
if !query.Stdout {
|
||||||
continue
|
|
||||||
}
|
|
||||||
header[0] = 1
|
|
||||||
case "stderr":
|
|
||||||
if !query.Stderr {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
header[0] = 2
|
|
||||||
default:
|
|
||||||
// Logging and moving on is the best we can do here. We may have already sent
|
|
||||||
// a Status and Content-Type to client therefore we can no longer report an error.
|
|
||||||
log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
header[0] = 1
|
||||||
|
case "stderr":
|
||||||
|
if !query.Stderr {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
header[0] = 2
|
||||||
|
default:
|
||||||
|
// Logging and moving on is the best we can do here. We may have already sent
|
||||||
|
// a Status and Content-Type to client therefore we can no longer report an error.
|
||||||
|
log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if query.Timestamps {
|
if query.Timestamps {
|
||||||
frame.WriteString(line.Time.Format(time.RFC3339))
|
frame.WriteString(line.Time.Format(time.RFC3339))
|
||||||
frame.WriteString(" ")
|
frame.WriteString(" ")
|
||||||
}
|
}
|
||||||
frame.WriteString(line.Msg)
|
frame.WriteString(line.Msg)
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
|
binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
|
||||||
if _, err := w.Write(header[0:8]); err != nil {
|
if _, err := w.Write(header[0:8]); err != nil {
|
||||||
log.Errorf("unable to write log output header: %q", err)
|
log.Errorf("unable to write log output header: %q", err)
|
||||||
}
|
}
|
||||||
if _, err := io.WriteString(w, frame.String()); err != nil {
|
if _, err := io.WriteString(w, frame.String()); err != nil {
|
||||||
log.Errorf("unable to write frame string: %q", err)
|
log.Errorf("unable to write frame string: %q", err)
|
||||||
}
|
}
|
||||||
if flusher, ok := w.(http.Flusher); ok {
|
if flusher, ok := w.(http.Flusher); ok {
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -924,7 +924,7 @@ func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []strin
|
|||||||
}
|
}
|
||||||
logChannel := make(chan *logs.LogLine, chSize)
|
logChannel := make(chan *logs.LogLine, chSize)
|
||||||
|
|
||||||
if err := ic.Libpod.Log(ctrs, logOpts, logChannel); err != nil {
|
if err := ic.Libpod.Log(ctx, ctrs, logOpts, logChannel); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -754,7 +754,7 @@ func (i *VarlinkAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return call.ReplyErrorOccurred(err.Error())
|
return call.ReplyErrorOccurred(err.Error())
|
||||||
}
|
}
|
||||||
if err := i.Runtime.Log(containers, &options, logChannel); err != nil {
|
if err := i.Runtime.Log(getContext(), containers, &options, logChannel); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
Reference in New Issue
Block a user