From 5ebd24dfb24fe9fd59206c6cf390f7fbc1b2f81b Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 8 Jul 2024 10:53:28 +0200 Subject: [PATCH] chore: Convert iterators in pattern module to `v2.Iterator` interface (#13407) This is first follow up to #13273 It changes the iterator from the pattern module to use the `v2.CloseIterator` to avoid re-implementing the iterator logic. With this change, the function `Error()` on the pattern iterator is renamed to `Err()` to conform with the `v2.Iterator` interface. Signed-off-by: Christian Haudum --- pkg/iter/iterator.go | 8 +-- pkg/iter/v2/iter.go | 27 ++++++++-- pkg/pattern/iter/batch.go | 2 +- pkg/pattern/iter/iterator.go | 83 ++++++++----------------------- pkg/pattern/iter/iterator_test.go | 4 +- pkg/pattern/iter/merge_pattern.go | 2 +- pkg/pattern/iter/query_client.go | 2 +- 7 files changed, 52 insertions(+), 76 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 4b4ef03cb9..970acaa30a 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -7,11 +7,11 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" ) -type Value interface { +type logprotoType interface { logproto.Entry | logproto.Sample } -type StreamIterator[T Value] interface { +type StreamIterator[T logprotoType] interface { v2.CloseIterator[T] // Labels returns the labels for the current entry. // The labels can be mutated by the query engine and not reflect the original stream. @@ -24,7 +24,7 @@ type EntryIterator StreamIterator[logproto.Entry] type SampleIterator StreamIterator[logproto.Sample] // noOpIterator implements StreamIterator -type noOpIterator[T Value] struct{} +type noOpIterator[T logprotoType] struct{} func (noOpIterator[T]) Next() bool { return false } func (noOpIterator[T]) Err() error { return nil } @@ -37,7 +37,7 @@ var NoopEntryIterator = noOpIterator[logproto.Entry]{} var NoopSampleIterator = noOpIterator[logproto.Sample]{} // errorIterator implements StreamIterator -type errorIterator[T Value] struct{} +type errorIterator[T logprotoType] struct{} func (errorIterator[T]) Next() bool { return false } func (errorIterator[T]) Err() error { return errors.New("error") } diff --git a/pkg/iter/v2/iter.go b/pkg/iter/v2/iter.go index 45e17fcad5..506b1a4f35 100644 --- a/pkg/iter/v2/iter.go +++ b/pkg/iter/v2/iter.go @@ -159,15 +159,15 @@ func NewCancelableIter[T any](ctx context.Context, itr Iterator[T]) *Cancellable return &CancellableIter[T]{ctx: ctx, Iterator: itr} } -func NewCloseableIterator[T io.Closer](itr Iterator[T]) *CloseIter[T] { - return &CloseIter[T]{itr} +func NewCloserIter[T io.Closer](itr Iterator[T]) *CloserIter[T] { + return &CloserIter[T]{itr} } -type CloseIter[T io.Closer] struct { +type CloserIter[T io.Closer] struct { Iterator[T] } -func (i *CloseIter[T]) Close() error { +func (i *CloserIter[T]) Close() error { return i.At().Close() } @@ -226,3 +226,22 @@ func (it *CounterIter[T]) Next() bool { func (it *CounterIter[T]) Count() int { return it.count } + +func WithClose[T any](itr Iterator[T], close func() bool) *CloseIter[T] { + return &CloseIter[T]{ + Iterator: itr, + close: close, + } +} + +type CloseIter[T any] struct { + Iterator[T] + close func() bool +} + +func (i *CloseIter[T]) Close() error { + if i.close != nil { + return i.Close() + } + return nil +} diff --git a/pkg/pattern/iter/batch.go b/pkg/pattern/iter/batch.go index c61fa6ee17..34f0137911 100644 --- a/pkg/pattern/iter/batch.go +++ b/pkg/pattern/iter/batch.go @@ -31,7 +31,7 @@ func ReadBatch(it Iterator, batchSize int) (*logproto.QueryPatternsResponse, err Samples: samples, }) } - return &result, it.Error() + return &result, it.Err() } func ReadAll(it Iterator) (*logproto.QueryPatternsResponse, error) { diff --git a/pkg/pattern/iter/iterator.go b/pkg/pattern/iter/iterator.go index 7bbdb0ed27..834e792d07 100644 --- a/pkg/pattern/iter/iterator.go +++ b/pkg/pattern/iter/iterator.go @@ -1,82 +1,39 @@ package iter import ( + iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/logproto" ) -var Empty Iterator = &emptyIterator{} - -// TODO(chaudum): inline v2.Iteratpr[logproto.PatternSample] type Iterator interface { - Next() bool + iter.CloseIterator[logproto.PatternSample] Pattern() string - At() logproto.PatternSample - - Error() error - Close() error } -func NewSlice(pattern string, s []logproto.PatternSample) Iterator { - // TODO(chaudum): replace with v2.NewSliceIter() - return &sliceIterator{ - values: s, - pattern: pattern, - i: -1, +func NewSlice(pattern string, s []logproto.PatternSample) *PatternIter { + return &PatternIter{ + CloseIterator: iter.WithClose(iter.NewSliceIter(s), nil), + pattern: pattern, } } -type sliceIterator struct { - i int +func NewEmpty(pattern string) *PatternIter { + return &PatternIter{ + CloseIterator: iter.WithClose(iter.NewEmptyIter[logproto.PatternSample](), nil), + pattern: pattern, + } +} + +type PatternIter struct { + iter.CloseIterator[logproto.PatternSample] pattern string - values []logproto.PatternSample } -func (s *sliceIterator) Next() bool { - s.i++ - return s.i < len(s.values) -} - -func (s *sliceIterator) Pattern() string { +func (s *PatternIter) Pattern() string { return s.pattern } -func (s *sliceIterator) At() logproto.PatternSample { - return s.values[s.i] -} - -func (s *sliceIterator) Error() error { - return nil -} - -func (s *sliceIterator) Close() error { - return nil -} - -type emptyIterator struct { - pattern string -} - -func (e *emptyIterator) Next() bool { - return false -} - -func (e *emptyIterator) Pattern() string { - return e.pattern -} - -func (e *emptyIterator) At() logproto.PatternSample { - return logproto.PatternSample{} -} - -func (e *emptyIterator) Error() error { - return nil -} - -func (e *emptyIterator) Close() error { - return nil -} - type nonOverlappingIterator struct { iterators []Iterator curr Iterator @@ -116,11 +73,11 @@ func (i *nonOverlappingIterator) Pattern() string { return i.pattern } -func (i *nonOverlappingIterator) Error() error { - if i.curr == nil { - return nil +func (i *nonOverlappingIterator) Err() error { + if i.curr != nil { + return i.curr.Err() } - return i.curr.Error() + return nil } func (i *nonOverlappingIterator) Close() error { diff --git a/pkg/pattern/iter/iterator_test.go b/pkg/pattern/iter/iterator_test.go index b327800575..3d14b2550c 100644 --- a/pkg/pattern/iter/iterator_test.go +++ b/pkg/pattern/iter/iterator_test.go @@ -65,8 +65,8 @@ func slice(it Iterator) []patternSample { sample: it.At(), }) } - if it.Error() != nil { - panic(it.Error()) + if it.Err() != nil { + panic(it.Err()) } return samples } diff --git a/pkg/pattern/iter/merge_pattern.go b/pkg/pattern/iter/merge_pattern.go index 3b0e07e33b..cebae37c80 100644 --- a/pkg/pattern/iter/merge_pattern.go +++ b/pkg/pattern/iter/merge_pattern.go @@ -78,7 +78,7 @@ func (m *mergeIterator) At() logproto.PatternSample { return m.current.sample } -func (m *mergeIterator) Error() error { +func (m *mergeIterator) Err() error { return nil } diff --git a/pkg/pattern/iter/query_client.go b/pkg/pattern/iter/query_client.go index 8bb82b9be7..d970b43f79 100644 --- a/pkg/pattern/iter/query_client.go +++ b/pkg/pattern/iter/query_client.go @@ -47,7 +47,7 @@ func (i *queryClientIterator) At() logproto.PatternSample { return i.curr.At() } -func (i *queryClientIterator) Error() error { +func (i *queryClientIterator) Err() error { return i.err }