Fix system service panic from early hangup in events

We weren't actually halting the goroutine that sent events, so it
would continue sending even when the channel closed (the most
notable cause being early hangup - e.g. Control-c on a curl
session). Use a context to cancel the events goroutine and stop
sending events.

Fixes #6805

Signed-off-by: Matthew Heon <matthew.heon@pm.me>
This commit is contained in:
Matthew Heon
2020-07-01 16:21:57 -04:00
parent 1a1e3f4b24
commit 9e4cf6ca51
10 changed files with 43 additions and 17 deletions

View File

@ -1,6 +1,7 @@
package libpod package libpod
import ( import (
"context"
"fmt" "fmt"
"github.com/containers/libpod/libpod/events" "github.com/containers/libpod/libpod/events"
@ -75,16 +76,16 @@ func (v *Volume) newVolumeEvent(status events.Status) {
// Events is a wrapper function for everyone to begin tailing the events log // Events is a wrapper function for everyone to begin tailing the events log
// with options // with options
func (r *Runtime) Events(options events.ReadOptions) error { func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error {
eventer, err := r.newEventer() eventer, err := r.newEventer()
if err != nil { if err != nil {
return err return err
} }
return eventer.Read(options) return eventer.Read(ctx, options)
} }
// GetEvents reads the event log and returns events based on input filters // GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) { func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
var readErr error var readErr error
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
options := events.ReadOptions{ options := events.ReadOptions{
@ -98,7 +99,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
return nil, err return nil, err
} }
go func() { go func() {
readErr = eventer.Read(options) readErr = eventer.Read(ctx, options)
}() }()
if readErr != nil { if readErr != nil {
return nil, readErr return nil, readErr
@ -112,7 +113,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
// GetLastContainerEvent takes a container name or ID and an event status and returns // GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event // the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.Status) (*events.Event, error) { func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// check to make sure the event.Status is valid // check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil { if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err return nil, err
@ -122,7 +123,7 @@ func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.S
fmt.Sprintf("event=%s", containerEvent), fmt.Sprintf("event=%s", containerEvent),
"type=container", "type=container",
} }
containerEvents, err := r.GetEvents(filters) containerEvents, err := r.GetEvents(ctx, filters)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -52,7 +53,7 @@ type Eventer interface {
// Write an event to a backend // Write an event to a backend
Write(event Event) error Write(event Event) error
// Read an event from the backend // Read an event from the backend
Read(options ReadOptions) error Read(ctx context.Context, options ReadOptions) error
// String returns the type of event logger // String returns the type of event logger
String() string String() string
} }

View File

@ -3,6 +3,7 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"time" "time"
@ -53,7 +54,7 @@ func (e EventJournalD) Write(ee Event) error {
} }
// Read reads events from the journal and sends qualified events to the event channel // Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(options ReadOptions) error { func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel) defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until) eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"os" "os"
@ -40,7 +41,7 @@ func (e EventLogFile) Write(ee Event) error {
} }
// Reads from the log file // Reads from the log file
func (e EventLogFile) Read(options ReadOptions) error { func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel) defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until) eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {
@ -50,6 +51,17 @@ func (e EventLogFile) Read(options ReadOptions) error {
if err != nil { if err != nil {
return err return err
} }
funcDone := make(chan bool)
copy := true
go func() {
select {
case <-funcDone:
// Do nothing
case <-ctx.Done():
copy = false
t.Kill(errors.New("hangup by client"))
}
}()
for line := range t.Lines { for line := range t.Lines {
event, err := newEventFromJSONString(line.Text) event, err := newEventFromJSONString(line.Text)
if err != nil { if err != nil {
@ -65,10 +77,11 @@ func (e EventLogFile) Read(options ReadOptions) error {
for _, filter := range eventOptions { for _, filter := range eventOptions {
include = include && filter(event) include = include && filter(event)
} }
if include { if include && copy {
options.EventChannel <- event options.EventChannel <- event
} }
} }
funcDone <- true
return nil return nil
} }

View File

@ -1,5 +1,9 @@
package events package events
import (
"context"
)
// EventToNull is an eventer type that only performs write operations // EventToNull is an eventer type that only performs write operations
// and only writes to /dev/null. It is meant for unittests only // and only writes to /dev/null. It is meant for unittests only
type EventToNull struct{} type EventToNull struct{}
@ -10,7 +14,7 @@ func (e EventToNull) Write(ee Event) error {
} }
// Read does nothing. Do not use it. // Read does nothing. Do not use it.
func (e EventToNull) Read(options ReadOptions) error { func (e EventToNull) Read(ctx context.Context, options ReadOptions) error {
return nil return nil
} }

View File

@ -1,6 +1,7 @@
package compat package compat
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
@ -45,13 +46,15 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
fromStart = true fromStart = true
} }
eventCtx, eventCancel := context.WithCancel(r.Context())
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
go func() { go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until} readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(readOpts) eventsError = runtime.Events(eventCtx, readOpts)
}() }()
if eventsError != nil { if eventsError != nil {
utils.InternalServerError(w, eventsError) utils.InternalServerError(w, eventsError)
eventCancel()
close(eventChannel) close(eventChannel)
return return
} }
@ -59,6 +62,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
// If client disappears we need to stop listening for events // If client disappears we need to stop listening for events
go func(done <-chan struct{}) { go func(done <-chan struct{}) {
<-done <-done
eventCancel()
if _, ok := <-eventChannel; ok { if _, ok := <-eventChannel; ok {
close(eventChannel) close(eventChannel)
} }

View File

@ -741,7 +741,7 @@ func (ic *ContainerEngine) ContainerStart(ctx context.Context, namesOrIds []stri
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound exitCode = define.ExecErrorCodeNotFound
@ -871,7 +871,7 @@ func (ic *ContainerEngine) ContainerRun(ctx context.Context, opts entities.Conta
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
report.ExitCode = define.ExecErrorCodeNotFound report.ExitCode = define.ExecErrorCodeNotFound

View File

@ -9,5 +9,5 @@ import (
func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error { func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error {
readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until} readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until}
return ic.Libpod.Events(readOpts) return ic.Libpod.Events(ctx, readOpts)
} }

View File

@ -4,6 +4,7 @@ package varlinkapi
import ( import (
"bufio" "bufio"
"context"
"io" "io"
"github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod"
@ -89,7 +90,7 @@ func (i *VarlinkAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys s
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := i.Runtime.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := i.Runtime.GetLastContainerEvent(context.Background(), ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound exitCode = define.ExecErrorCodeNotFound

View File

@ -3,6 +3,7 @@
package varlinkapi package varlinkapi
import ( import (
"context"
"time" "time"
"github.com/containers/libpod/libpod/events" "github.com/containers/libpod/libpod/events"
@ -27,7 +28,7 @@ func (i *VarlinkAPI) GetEvents(call iopodman.VarlinkCall, filter []string, since
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
go func() { go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: stream, Filters: filter, EventChannel: eventChannel} readOpts := events.ReadOptions{FromStart: fromStart, Stream: stream, Filters: filter, EventChannel: eventChannel}
eventsError = i.Runtime.Events(readOpts) eventsError = i.Runtime.Events(context.Background(), readOpts)
}() }()
if eventsError != nil { if eventsError != nil {
return call.ReplyErrorOccurred(eventsError.Error()) return call.ReplyErrorOccurred(eventsError.Error())