mirror of
https://github.com/containers/podman.git
synced 2025-07-02 00:30:00 +08:00
event backend journald: fix problem with empty journal
Currently podman events will just fail with `Error: failed to get journal cursor: failed to get cursor: cannot assign requested address` when the journal contains zero podman events. The problem is that we are using the journal accessors wrong. There is no need to call GetCursor() and compare them manually. The Next() return an integer which tells if it moved to the next or not. This means the we can remove GetCursor() which would fail when there is no entry. This also includes another bug fix. Previously the logic called Next() twice for the first entry which caused us to miss the first entry. To reproduce this issue you can run the following commands: ``` sudo journalctl --rotate sudo journalctl --vacuum-time=1s ``` Note that this will delete the full journal. Now run podman events and it fails but with this patch it works. Now generate a single event, i.e. podman pull alpine, and run podman events --until 1s. I am not sure how to get a reliable test into CI, I really do not want to delete the journal and developer or CI systems. Fixes second part of #15688 Signed-off-by: Paul Holzinger <pholzing@redhat.com>
This commit is contained in:
@ -112,57 +112,16 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// the api requires a next|prev before getting a cursor
|
||||
if _, err := j.Next(); err != nil {
|
||||
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
|
||||
}
|
||||
|
||||
prevCursor, err := j.GetCursor()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get journal cursor: %w", err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// the consumer has cancelled
|
||||
entry, err := getNextEntry(ctx, j, options.Stream, untilTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// no entry == we hit the end
|
||||
if entry == nil {
|
||||
return nil
|
||||
default:
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
if _, err := j.Next(); err != nil {
|
||||
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
|
||||
}
|
||||
newCursor, err := j.GetCursor()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get journal cursor: %w", err)
|
||||
}
|
||||
if prevCursor == newCursor {
|
||||
if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) {
|
||||
break
|
||||
}
|
||||
|
||||
// j.Wait() is blocking, this would cause the goroutine to hang forever
|
||||
// if no more journal entries are generated and thus if the client
|
||||
// has closed the connection in the meantime to leak memory.
|
||||
// Waiting only 5 seconds makes sure we can check if the client closed in the
|
||||
// meantime at least every 5 seconds.
|
||||
t := 5 * time.Second
|
||||
if len(options.Until) > 0 {
|
||||
until := time.Until(untilTime)
|
||||
if until < t {
|
||||
t = until
|
||||
}
|
||||
}
|
||||
_ = j.Wait(t)
|
||||
continue
|
||||
}
|
||||
prevCursor = newCursor
|
||||
|
||||
entry, err := j.GetEntry()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read journal entry: %w", err)
|
||||
}
|
||||
newEvent, err := newEventFromJournalEntry(entry)
|
||||
if err != nil {
|
||||
// We can't decode this event.
|
||||
@ -177,7 +136,6 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
|
||||
options.EventChannel <- newEvent
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
|
||||
@ -238,3 +196,51 @@ func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
|
||||
func (e EventJournalD) String() string {
|
||||
return Journald.String()
|
||||
}
|
||||
|
||||
// getNextEntry returns the next entry in the journal. If the end of the
|
||||
// journal is reached and stream is not set or the current time is after
|
||||
// the until time this function return nil,nil.
|
||||
func getNextEntry(ctx context.Context, j *sdjournal.Journal, stream bool, untilTime time.Time) (*sdjournal.JournalEntry, error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// the consumer has cancelled
|
||||
return nil, nil
|
||||
default:
|
||||
// fallthrough
|
||||
}
|
||||
// the api requires a next|prev before reading the event
|
||||
ret, err := j.Next()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to move journal cursor to next entry: %w", err)
|
||||
}
|
||||
// ret == 0 equals EOF, see sd_journal_next(3)
|
||||
if ret == 0 {
|
||||
if !stream || (!untilTime.IsZero() && time.Now().After(untilTime)) {
|
||||
// we hit the end and should not keep streaming
|
||||
return nil, nil
|
||||
}
|
||||
// keep waiting for the next entry
|
||||
// j.Wait() is blocking, this would cause the goroutine to hang forever
|
||||
// if no more journal entries are generated and thus if the client
|
||||
// has closed the connection in the meantime to leak memory.
|
||||
// Waiting only 5 seconds makes sure we can check if the client closed in the
|
||||
// meantime at least every 5 seconds.
|
||||
t := 5 * time.Second
|
||||
if !untilTime.IsZero() {
|
||||
until := time.Until(untilTime)
|
||||
if until < t {
|
||||
t = until
|
||||
}
|
||||
}
|
||||
_ = j.Wait(t)
|
||||
continue
|
||||
}
|
||||
|
||||
entry, err := j.GetEntry()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read journal entry: %w", err)
|
||||
}
|
||||
return entry, nil
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user