Merge pull request #6407 from baude/v2eventsstream

Add streaming ability to endpoint
This commit is contained in:
OpenShift Merge Robot
2020-05-27 15:07:32 -04:00
committed by GitHub
8 changed files with 72 additions and 43 deletions

View File

@ -26,7 +26,10 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
Since string `schema:"since"` Since string `schema:"since"`
Until string `schema:"until"` Until string `schema:"until"`
Filters map[string][]string `schema:"filters"` Filters map[string][]string `schema:"filters"`
}{} Stream bool `schema:"stream"`
}{
Stream: true,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil { if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String())) utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
} }
@ -41,9 +44,10 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
if len(query.Since) > 0 || len(query.Until) > 0 { if len(query.Since) > 0 || len(query.Until) > 0 {
fromStart = true fromStart = true
} }
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
go func() { go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: true, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until} readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(readOpts) eventsError = runtime.Events(readOpts)
}() }()
if eventsError != nil { if eventsError != nil {
@ -55,7 +59,9 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
// If client disappears we need to stop listening for events // If client disappears we need to stop listening for events
go func(done <-chan struct{}) { go func(done <-chan struct{}) {
<-done <-done
if _, ok := <-eventChannel; ok {
close(eventChannel) close(eventChannel)
}
}(r.Context().Done()) }(r.Context().Done())
// Headers need to be written out before turning Writer() over to json encoder // Headers need to be written out before turning Writer() over to json encoder

View File

@ -58,6 +58,11 @@ func (s *APIServer) registerEventsHandlers(r *mux.Router) error {
// type: string // type: string
// in: query // in: query
// description: JSON encoded map[string][]string of constraints // description: JSON encoded map[string][]string of constraints
// - name: stream
// type: boolean
// in: query
// default: true
// description: when false, do not follow events
// responses: // responses:
// 200: // 200:
// description: returns a string of json data describing an event // description: returns a string of json data describing an event

View File

@ -20,7 +20,7 @@ import (
// Events allows you to monitor libdpod related events like container creation and // Events allows you to monitor libdpod related events like container creation and
// removal. The events are then passed to the eventChan provided. The optional cancelChan // removal. The events are then passed to the eventChan provided. The optional cancelChan
// can be used to cancel the read of events and close down the HTTP connection. // can be used to cancel the read of events and close down the HTTP connection.
func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan chan bool, since, until *string, filters map[string][]string) error { func Events(ctx context.Context, eventChan chan entities.Event, cancelChan chan bool, since, until *string, filters map[string][]string, stream *bool) error {
conn, err := bindings.GetClient(ctx) conn, err := bindings.GetClient(ctx)
if err != nil { if err != nil {
return err return err
@ -32,6 +32,9 @@ func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan cha
if until != nil { if until != nil {
params.Set("until", *until) params.Set("until", *until)
} }
if stream != nil {
params.Set("stream", strconv.FormatBool(*stream))
}
if filters != nil { if filters != nil {
filterString, err := bindings.FiltersToString(filters) filterString, err := bindings.FiltersToString(filters)
if err != nil { if err != nil {
@ -50,18 +53,24 @@ func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan cha
logrus.Error(errors.Wrap(err, "unable to close event response body")) logrus.Error(errors.Wrap(err, "unable to close event response body"))
}() }()
} }
dec := json.NewDecoder(response.Body) dec := json.NewDecoder(response.Body)
for { for err = (error)(nil); err == nil; {
e := entities.Event{} var e = entities.Event{}
if err := dec.Decode(&e); err != nil { err = dec.Decode(&e)
if err == io.EOF { if err == nil {
break
}
return errors.Wrap(err, "unable to decode event response")
}
eventChan <- e eventChan <- e
} }
}
close(eventChan)
switch {
case err == nil:
return nil return nil
case errors.Is(err, io.EOF):
return nil
default:
return errors.Wrap(err, "unable to decode event response")
}
} }
// Prune removes all unused system data. // Prune removes all unused system data.

View File

@ -47,13 +47,13 @@ var _ = Describe("Podman system", func() {
} }
}() }()
go func() { go func() {
system.Events(bt.conn, eChan, cancelChan, nil, nil, nil) system.Events(bt.conn, eChan, cancelChan, nil, nil, nil, bindings.PFalse)
}() }()
_, err := bt.RunTopContainer(nil, nil, nil) _, err := bt.RunTopContainer(nil, nil, nil)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
cancelChan <- true cancelChan <- true
Expect(len(messages)).To(BeNumerically("==", 3)) Expect(len(messages)).To(BeNumerically("==", 5))
}) })
It("podman system prune - pod,container stopped", func() { It("podman system prune - pod,container stopped", func() {

View File

@ -5,12 +5,9 @@ import (
"github.com/containers/libpod/libpod/events" "github.com/containers/libpod/libpod/events"
"github.com/containers/libpod/pkg/domain/entities" "github.com/containers/libpod/pkg/domain/entities"
"github.com/sirupsen/logrus"
) )
func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error { func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error {
readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until} readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until}
err := ic.Libpod.Events(readOpts) return ic.Libpod.Events(readOpts)
logrus.Error(err)
return err
} }

View File

@ -25,6 +25,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio
for e := range binChan { for e := range binChan {
opts.EventChan <- entities.ConvertToLibpodEvent(e) opts.EventChan <- entities.ConvertToLibpodEvent(e)
} }
close(opts.EventChan)
}() }()
return system.Events(ic.ClientCxt, binChan, nil, &opts.Since, &opts.Until, filters) return system.Events(ic.ClientCxt, binChan, nil, &opts.Since, &opts.Until, filters, &opts.Stream)
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/containers/storage/pkg/reexec" "github.com/containers/storage/pkg/reexec"
"github.com/containers/storage/pkg/stringid" "github.com/containers/storage/pkg/stringid"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
. "github.com/onsi/gomega/gexec" . "github.com/onsi/gomega/gexec"
@ -573,3 +574,10 @@ func (p *PodmanTestIntegration) CreateSeccompJson(in []byte) (string, error) {
} }
return jsonFile, nil return jsonFile, nil
} }
func SkipIfNotFedora() {
info := GetHostDistributionInfo()
if info.Distribution != "fedora" {
ginkgo.Skip("Test can only run on Fedora")
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"time"
. "github.com/containers/libpod/test/utils" . "github.com/containers/libpod/test/utils"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -24,23 +25,26 @@ var _ = Describe("Podman events", func() {
os.Exit(1) os.Exit(1)
} }
podmanTest = PodmanTestCreate(tempdir) podmanTest = PodmanTestCreate(tempdir)
podmanTest.Setup()
podmanTest.SeedImages() podmanTest.SeedImages()
}) })
AfterEach(func() { AfterEach(func() {
podmanTest.Cleanup() podmanTest.Cleanup()
f := CurrentGinkgoTestDescription() f := CurrentGinkgoTestDescription()
timedResult := fmt.Sprintf("Test: %s completed in %f seconds", f.TestText, f.Duration.Seconds()) processTestResult(f)
GinkgoWriter.Write([]byte(timedResult))
}) })
// For most, all, of these tests we do not "live" test following a log because it may make a fragile test // For most, all, of these tests we do not "live" test following a log because it may make a fragile test
// system more complex. Instead we run the "events" and then verify that the events are processed correctly. // system more complex. Instead we run the "events" and then verify that the events are processed correctly.
// Perhaps a future version of this test would put events in a go func and send output back over a channel // Perhaps a future version of this test would put events in a go func and send output back over a channel
// while events occur. // while events occur.
// These tests are only known to work on Fedora ATM. Other distributions
// will be skipped.
It("podman events", func() { It("podman events", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
_, ec, _ := podmanTest.RunLsContainer("") _, ec, _ := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
result := podmanTest.Podman([]string{"events", "--stream=false"}) result := podmanTest.Podman([]string{"events", "--stream=false"})
@ -49,7 +53,8 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events with an event filter", func() { It("podman events with an event filter", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
_, ec, _ := podmanTest.RunLsContainer("") _, ec, _ := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "event=start"}) result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "event=start"})
@ -59,11 +64,14 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events with an event filter and container=cid", func() { It("podman events with an event filter and container=cid", func() {
Skip("need to verify images have correct packages for journald") Skip("Does not work on v2")
SkipIfRootless()
SkipIfNotFedora()
_, ec, cid := podmanTest.RunLsContainer("") _, ec, cid := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
_, ec2, cid2 := podmanTest.RunLsContainer("") _, ec2, cid2 := podmanTest.RunLsContainer("")
Expect(ec2).To(Equal(0)) Expect(ec2).To(Equal(0))
time.Sleep(5 * time.Second)
result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "event=start", "--filter", fmt.Sprintf("container=%s", cid)}) result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "event=start", "--filter", fmt.Sprintf("container=%s", cid)})
result.WaitWithDefaultTimeout() result.WaitWithDefaultTimeout()
Expect(result.ExitCode()).To(Equal(0)) Expect(result.ExitCode()).To(Equal(0))
@ -72,7 +80,8 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events with a type and filter container=id", func() { It("podman events with a type and filter container=id", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
_, ec, cid := podmanTest.RunLsContainer("") _, ec, cid := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "type=pod", "--filter", fmt.Sprintf("container=%s", cid)}) result := podmanTest.Podman([]string{"events", "--stream=false", "--filter", "type=pod", "--filter", fmt.Sprintf("container=%s", cid)})
@ -82,7 +91,8 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events with a type", func() { It("podman events with a type", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
setup := podmanTest.Podman([]string{"run", "-dt", "--pod", "new:foobarpod", ALPINE, "top"}) setup := podmanTest.Podman([]string{"run", "-dt", "--pod", "new:foobarpod", ALPINE, "top"})
setup.WaitWithDefaultTimeout() setup.WaitWithDefaultTimeout()
stop := podmanTest.Podman([]string{"pod", "stop", "foobarpod"}) stop := podmanTest.Podman([]string{"pod", "stop", "foobarpod"})
@ -97,7 +107,8 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events --since", func() { It("podman events --since", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
_, ec, _ := podmanTest.RunLsContainer("") _, ec, _ := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
result := podmanTest.Podman([]string{"events", "--stream=false", "--since", "1m"}) result := podmanTest.Podman([]string{"events", "--stream=false", "--since", "1m"})
@ -106,7 +117,8 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events --until", func() { It("podman events --until", func() {
Skip("need to verify images have correct packages for journald") SkipIfRootless()
SkipIfNotFedora()
_, ec, _ := podmanTest.RunLsContainer("") _, ec, _ := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
test := podmanTest.Podman([]string{"events", "--help"}) test := podmanTest.Podman([]string{"events", "--help"})
@ -118,37 +130,28 @@ var _ = Describe("Podman events", func() {
}) })
It("podman events format", func() { It("podman events format", func() {
Skip(v2remotefail) SkipIfRootless()
info := GetHostDistributionInfo() SkipIfNotFedora()
if info.Distribution != "fedora" {
Skip("need to verify images have correct packages for journald")
}
_, ec, _ := podmanTest.RunLsContainer("") _, ec, _ := podmanTest.RunLsContainer("")
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"}) test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"})
test.WaitWithDefaultTimeout() test.WaitWithDefaultTimeout()
fmt.Println(test.OutputToStringArray())
jsonArr := test.OutputToStringArray() jsonArr := test.OutputToStringArray()
Expect(len(jsonArr)).To(Not(BeZero())) Expect(len(jsonArr)).To(Not(BeZero()))
eventsMap := make(map[string]string) eventsMap := make(map[string]string)
err := json.Unmarshal([]byte(jsonArr[0]), &eventsMap) err := json.Unmarshal([]byte(jsonArr[0]), &eventsMap)
if err != nil { Expect(err).To(BeNil())
os.Exit(1)
}
_, exist := eventsMap["Status"] _, exist := eventsMap["Status"]
Expect(exist).To(BeTrue()) Expect(exist).To(BeTrue())
Expect(test.ExitCode()).To(BeZero()) Expect(test.ExitCode()).To(BeZero())
test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"}) test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"})
test.WaitWithDefaultTimeout() test.WaitWithDefaultTimeout()
fmt.Println(test.OutputToStringArray())
jsonArr = test.OutputToStringArray() jsonArr = test.OutputToStringArray()
Expect(len(jsonArr)).To(Not(BeZero())) Expect(len(jsonArr)).To(Not(BeZero()))
eventsMap = make(map[string]string) eventsMap = make(map[string]string)
err = json.Unmarshal([]byte(jsonArr[0]), &eventsMap) err = json.Unmarshal([]byte(jsonArr[0]), &eventsMap)
if err != nil { Expect(err).To(BeNil())
os.Exit(1)
}
_, exist = eventsMap["Status"] _, exist = eventsMap["Status"]
Expect(exist).To(BeTrue()) Expect(exist).To(BeTrue())
Expect(test.ExitCode()).To(BeZero()) Expect(test.ExitCode()).To(BeZero())