mirror of
https://github.com/containers/podman.git
synced 2025-10-17 19:24:04 +08:00
events endpoint: fix panic and race condition
Fix a potential panic in the events endpoint when parsing the filters parameter. Values of the filters map might be empty, so we need to account for that instead of uncondtitionally accessing the first item. Also apply a similar for race conditions as done in commit f4a2d25c0fca: Fix a race that could cause read errors to be masked. Masking such errors is likely to report red herrings since users don't see that reading failed for some reasons but that a given event could not be found. Another race was the handler closing event channel, which could lead to two kinds of panics: double close, send to close channel. The backend takes care of that. However, make sure that the backend stops working in case the context has been cancelled. Fixes: #6899 Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
This commit is contained in:
@ -90,6 +90,13 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// the consumer has cancelled
|
||||
return nil
|
||||
default:
|
||||
// fallthrough
|
||||
}
|
||||
if _, err := j.Next(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -63,6 +63,14 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
|
||||
}
|
||||
}()
|
||||
for line := range t.Lines {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// the consumer has cancelled
|
||||
return nil
|
||||
default:
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
event, err := newEventFromJSONString(line.Text)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1,9 +1,9 @@
|
||||
package compat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/containers/libpod/v2/libpod"
|
||||
"github.com/containers/libpod/v2/libpod/events"
|
||||
@ -17,10 +17,10 @@ import (
|
||||
|
||||
func GetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
fromStart bool
|
||||
eventsError error
|
||||
decoder = r.Context().Value("decoder").(*schema.Decoder)
|
||||
runtime = r.Context().Value("runtime").(*libpod.Runtime)
|
||||
fromStart bool
|
||||
decoder = r.Context().Value("decoder").(*schema.Decoder)
|
||||
runtime = r.Context().Value("runtime").(*libpod.Runtime)
|
||||
json = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level
|
||||
)
|
||||
|
||||
query := struct {
|
||||
@ -33,11 +33,16 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
|
||||
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
|
||||
return
|
||||
}
|
||||
|
||||
var libpodFilters = []string{}
|
||||
if _, found := r.URL.Query()["filters"]; found {
|
||||
for k, v := range query.Filters {
|
||||
if len(v) == 0 {
|
||||
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Errorf("empty value for filter %q", k))
|
||||
return
|
||||
}
|
||||
libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0]))
|
||||
}
|
||||
}
|
||||
@ -46,46 +51,57 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
fromStart = true
|
||||
}
|
||||
|
||||
eventCtx, eventCancel := context.WithCancel(r.Context())
|
||||
eventChannel := make(chan *events.Event)
|
||||
errorChannel := make(chan error)
|
||||
|
||||
// Start reading events.
|
||||
go func() {
|
||||
readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
|
||||
eventsError = runtime.Events(eventCtx, readOpts)
|
||||
readOpts := events.ReadOptions{
|
||||
FromStart: fromStart,
|
||||
Stream: query.Stream,
|
||||
Filters: libpodFilters,
|
||||
EventChannel: eventChannel,
|
||||
Since: query.Since,
|
||||
Until: query.Until,
|
||||
}
|
||||
errorChannel <- runtime.Events(r.Context(), readOpts)
|
||||
}()
|
||||
if eventsError != nil {
|
||||
utils.InternalServerError(w, eventsError)
|
||||
eventCancel()
|
||||
close(eventChannel)
|
||||
return
|
||||
}
|
||||
|
||||
// If client disappears we need to stop listening for events
|
||||
go func(done <-chan struct{}) {
|
||||
<-done
|
||||
eventCancel()
|
||||
if _, ok := <-eventChannel; ok {
|
||||
close(eventChannel)
|
||||
var coder *jsoniter.Encoder
|
||||
var writeHeader sync.Once
|
||||
|
||||
for stream := true; stream; stream = query.Stream {
|
||||
select {
|
||||
case err := <-errorChannel:
|
||||
if err != nil {
|
||||
utils.InternalServerError(w, err)
|
||||
return
|
||||
}
|
||||
case evt := <-eventChannel:
|
||||
writeHeader.Do(func() {
|
||||
// Use a sync.Once so that we write the header
|
||||
// only once.
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
coder = json.NewEncoder(w)
|
||||
coder.SetEscapeHTML(true)
|
||||
})
|
||||
|
||||
if evt == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
e := entities.ConvertToEntitiesEvent(*evt)
|
||||
if err := coder.Encode(e); err != nil {
|
||||
logrus.Errorf("unable to write json: %q", err)
|
||||
}
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}(r.Context().Done())
|
||||
|
||||
// Headers need to be written out before turning Writer() over to json encoder
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
coder := json.NewEncoder(w)
|
||||
coder.SetEscapeHTML(true)
|
||||
|
||||
for event := range eventChannel {
|
||||
e := entities.ConvertToEntitiesEvent(*event)
|
||||
if err := coder.Encode(e); err != nil {
|
||||
logrus.Errorf("unable to write json: %q", err)
|
||||
}
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package test_bindings
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containers/libpod/v2/pkg/bindings"
|
||||
@ -38,22 +39,28 @@ var _ = Describe("Podman system", func() {
|
||||
})
|
||||
|
||||
It("podman events", func() {
|
||||
eChan := make(chan entities.Event, 1)
|
||||
var messages []entities.Event
|
||||
cancelChan := make(chan bool, 1)
|
||||
var name = "top"
|
||||
_, err := bt.RunTopContainer(&name, bindings.PFalse, nil)
|
||||
Expect(err).To(BeNil())
|
||||
|
||||
filters := make(map[string][]string)
|
||||
filters["container"] = []string{name}
|
||||
|
||||
binChan := make(chan entities.Event)
|
||||
done := sync.Mutex{}
|
||||
done.Lock()
|
||||
eventCounter := 0
|
||||
go func() {
|
||||
for e := range eChan {
|
||||
messages = append(messages, e)
|
||||
defer done.Unlock()
|
||||
for range binChan {
|
||||
eventCounter++
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
system.Events(bt.conn, eChan, cancelChan, nil, nil, nil, bindings.PFalse)
|
||||
}()
|
||||
|
||||
_, err := bt.RunTopContainer(nil, nil, nil)
|
||||
err = system.Events(bt.conn, binChan, nil, nil, nil, filters, bindings.PFalse)
|
||||
Expect(err).To(BeNil())
|
||||
cancelChan <- true
|
||||
Expect(len(messages)).To(BeNumerically("==", 5))
|
||||
done.Lock()
|
||||
Expect(eventCounter).To(BeNumerically(">", 0))
|
||||
})
|
||||
|
||||
It("podman system prune - pod,container stopped", func() {
|
||||
|
@ -136,6 +136,7 @@ var _ = Describe("Podman events", func() {
|
||||
Expect(ec).To(Equal(0))
|
||||
test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"})
|
||||
test.WaitWithDefaultTimeout()
|
||||
Expect(test.ExitCode()).To(BeZero())
|
||||
jsonArr := test.OutputToStringArray()
|
||||
Expect(len(jsonArr)).To(Not(BeZero()))
|
||||
eventsMap := make(map[string]string)
|
||||
@ -143,10 +144,10 @@ var _ = Describe("Podman events", func() {
|
||||
Expect(err).To(BeNil())
|
||||
_, exist := eventsMap["Status"]
|
||||
Expect(exist).To(BeTrue())
|
||||
Expect(test.ExitCode()).To(BeZero())
|
||||
|
||||
test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"})
|
||||
test.WaitWithDefaultTimeout()
|
||||
Expect(test.ExitCode()).To(BeZero())
|
||||
jsonArr = test.OutputToStringArray()
|
||||
Expect(len(jsonArr)).To(Not(BeZero()))
|
||||
eventsMap = make(map[string]string)
|
||||
@ -154,6 +155,5 @@ var _ = Describe("Podman events", func() {
|
||||
Expect(err).To(BeNil())
|
||||
_, exist = eventsMap["Status"]
|
||||
Expect(exist).To(BeTrue())
|
||||
Expect(test.ExitCode()).To(BeZero())
|
||||
})
|
||||
})
|
||||
|
Reference in New Issue
Block a user