feat: Add step param to Patterns Query API (#12703)

This commit is contained in:
benclive
2024-04-25 18:33:47 +01:00
committed by GitHub
parent 3bf2d1fea0
commit 7b8533e435
12 changed files with 343 additions and 100 deletions

View File

@@ -9,6 +9,7 @@ import (
func ParsePatternsQuery(r *http.Request) (*logproto.QueryPatternsRequest, error) {
req := &logproto.QueryPatternsRequest{}
req.Query = query(r)
start, end, err := bounds(r)
if err != nil {
return nil, err
@@ -16,6 +17,19 @@ func ParsePatternsQuery(r *http.Request) (*logproto.QueryPatternsRequest, error)
req.Start = start
req.End = end
req.Query = query(r)
calculatedStep, err := step(r, start, end)
if err != nil {
return nil, err
}
if calculatedStep <= 0 {
return nil, errZeroOrNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (req.End.Sub(req.Start) / calculatedStep) > 11000 {
return nil, errStepTooSmall
}
req.Step = calculatedStep.Milliseconds()
return req, nil
}

View File

@@ -0,0 +1,122 @@
package loghttp
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
)
func TestParsePatternsQuery(t *testing.T) {
t.Parallel()
tests := []struct {
name string
path string
want *logproto.QueryPatternsRequest
wantErr bool
}{
{
name: "should correctly parse valid params",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (5 * time.Second).Milliseconds(),
},
},
{
name: "should default empty step param to sensible step for the range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should default start to zero for empty start param",
path: "/loki/api/v1/patterns?query={}&end=3600000000000",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should accept step with no units as seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Second).Milliseconds(),
},
},
{
name: "should accept step as string duration in seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (15 * time.Second).Milliseconds(),
},
},
{
name: "should correctly parse long duration for step",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Hour).Milliseconds(),
},
},
{
name: "should reject negative step value",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s",
want: nil,
wantErr: true,
},
{
name: "should reject very small step for big range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms",
want: nil,
wantErr: true,
},
{
name: "should accept very small step for small range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms",
want: &logproto.QueryPatternsRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(110, 0),
Step: (50 * time.Millisecond).Milliseconds(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, tt.path, nil)
require.NoError(t, err)
err = req.ParseForm()
require.NoError(t, err)
got, err := ParsePatternsQuery(req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path)
})
}
}

View File

@@ -335,6 +335,7 @@ func (m *VolumeRequest) LogToSpan(sp opentracing.Span) {
otlog.String("query", m.GetQuery()),
otlog.String("start", timestamp.Time(int64(m.From)).String()),
otlog.String("end", timestamp.Time(int64(m.Through)).String()),
otlog.String("step", time.Duration(m.Step).String()),
)
}
@@ -448,8 +449,6 @@ func (m *ShardsRequest) LogToSpan(sp opentracing.Span) {
func (m *QueryPatternsRequest) GetCachingOptions() (res definitions.CachingOptions) { return }
func (m *QueryPatternsRequest) GetStep() int64 { return 0 }
func (m *QueryPatternsRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
@@ -469,9 +468,10 @@ func (m *QueryPatternsRequest) WithStartEndForCache(start, end time.Time) result
func (m *QueryPatternsRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("query", m.GetQuery()),
otlog.String("step", time.Duration(m.Step).String()),
}
sp.LogFields(fields...)
}

View File

@@ -39,6 +39,7 @@ type QueryPatternsRequest struct {
Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
Start time.Time `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"`
End time.Time `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"`
Step int64 `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"`
}
func (m *QueryPatternsRequest) Reset() { *m = QueryPatternsRequest{} }
@@ -94,6 +95,13 @@ func (m *QueryPatternsRequest) GetEnd() time.Time {
return time.Time{}
}
func (m *QueryPatternsRequest) GetStep() int64 {
if m != nil {
return m.Step
}
return 0
}
type QueryPatternsResponse struct {
Series []*PatternSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"`
}
@@ -242,37 +250,38 @@ func init() {
func init() { proto.RegisterFile("pkg/logproto/pattern.proto", fileDescriptor_aaf4192acc66a4ea) }
var fileDescriptor_aaf4192acc66a4ea = []byte{
// 470 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x31, 0x6f, 0xd4, 0x30,
0x14, 0x8e, 0x1b, 0xae, 0xd7, 0xba, 0x62, 0x31, 0x57, 0x88, 0x82, 0xe4, 0x9c, 0xb2, 0x70, 0x53,
0x0c, 0x57, 0x09, 0x24, 0xc6, 0x9b, 0x18, 0x40, 0x2a, 0x81, 0x09, 0xc1, 0x90, 0x6b, 0x5d, 0xe7,
0xd4, 0x38, 0x4e, 0x63, 0xbb, 0x12, 0x1b, 0x3f, 0xe1, 0x7e, 0x02, 0x23, 0x3f, 0xa5, 0xe3, 0x8d,
0x15, 0x43, 0xe1, 0x72, 0x0b, 0x63, 0x7f, 0x02, 0x8a, 0xed, 0xf4, 0xae, 0x15, 0x1d, 0x58, 0x12,
0xbf, 0xf7, 0x7d, 0xef, 0xf3, 0xf7, 0xde, 0x33, 0x0c, 0xab, 0x53, 0x46, 0x0a, 0xc1, 0xaa, 0x5a,
0x28, 0x41, 0xaa, 0x4c, 0x29, 0x5a, 0x97, 0x89, 0x89, 0xd0, 0x4e, 0x97, 0x0f, 0x07, 0x4c, 0x30,
0x61, 0x29, 0xed, 0xc9, 0xe2, 0x61, 0xc4, 0x84, 0x60, 0x05, 0x25, 0x26, 0x9a, 0xea, 0x13, 0xa2,
0x66, 0x9c, 0x4a, 0x95, 0xf1, 0xca, 0x11, 0x9e, 0xde, 0x12, 0xef, 0x0e, 0x0e, 0x7c, 0xd4, 0x82,
0x95, 0x96, 0xb9, 0xf9, 0xd8, 0x64, 0xfc, 0x1d, 0xc0, 0xc1, 0x7b, 0x4d, 0xeb, 0xaf, 0x87, 0xd6,
0x89, 0x4c, 0xe9, 0x99, 0xa6, 0x52, 0xa1, 0x01, 0xec, 0x9d, 0xb5, 0xf9, 0x00, 0x0c, 0xc1, 0x68,
0x37, 0xb5, 0x01, 0x7a, 0x0d, 0x7b, 0x52, 0x65, 0xb5, 0x0a, 0xb6, 0x86, 0x60, 0xb4, 0x37, 0x0e,
0x13, 0xeb, 0x28, 0xe9, 0x1c, 0x25, 0x1f, 0x3b, 0x47, 0x93, 0x9d, 0x8b, 0xab, 0xc8, 0x9b, 0xff,
0x8a, 0x40, 0x6a, 0x4b, 0xd0, 0x4b, 0xe8, 0xd3, 0xf2, 0x38, 0xf0, 0xff, 0xa3, 0xb2, 0x2d, 0x88,
0xdf, 0xc0, 0xfd, 0x3b, 0x0e, 0x65, 0x25, 0x4a, 0x49, 0x11, 0x81, 0xdb, 0x92, 0xd6, 0x33, 0x2a,
0x03, 0x30, 0xf4, 0x47, 0x7b, 0xe3, 0x27, 0xc9, 0x4d, 0xc7, 0x8e, 0xfb, 0xc1, 0xc0, 0xa9, 0xa3,
0xc5, 0x9f, 0xe1, 0xc3, 0x5b, 0x00, 0x0a, 0x60, 0xdf, 0x6d, 0xc0, 0xb5, 0xd9, 0x85, 0xe8, 0x05,
0xec, 0xcb, 0x8c, 0x57, 0x05, 0x95, 0xc1, 0xd6, 0x7d, 0xe2, 0x06, 0x4f, 0x3b, 0x5e, 0xac, 0xd6,
0xea, 0x26, 0x83, 0xde, 0xc1, 0xdd, 0x9b, 0x05, 0x19, 0x7d, 0x7f, 0x42, 0xda, 0xd6, 0x7e, 0x5e,
0x45, 0xcf, 0xd8, 0x4c, 0xe5, 0x7a, 0x9a, 0x1c, 0x09, 0xde, 0x6e, 0x93, 0x53, 0x95, 0x53, 0x2d,
0xc9, 0x91, 0xe0, 0x5c, 0x94, 0x84, 0x8b, 0x63, 0x5a, 0x98, 0x81, 0xa4, 0x6b, 0x85, 0x76, 0x23,
0xe7, 0x59, 0xa1, 0xa9, 0x99, 0xbd, 0x9f, 0xda, 0x60, 0x3c, 0x07, 0xb0, 0xef, 0xae, 0x45, 0xaf,
0xe0, 0x83, 0x43, 0x2d, 0x73, 0xb4, 0xbf, 0xe1, 0x55, 0xcb, 0xdc, 0xad, 0x34, 0x7c, 0x7c, 0x37,
0x6d, 0xe7, 0x18, 0x7b, 0xe8, 0x2d, 0xec, 0x99, 0x11, 0x23, 0xbc, 0xa6, 0xfc, 0xeb, 0x55, 0x84,
0xd1, 0xbd, 0x78, 0xa7, 0xf5, 0x1c, 0x4c, 0xbe, 0x2c, 0x96, 0xd8, 0xbb, 0x5c, 0x62, 0xef, 0x7a,
0x89, 0xc1, 0xb7, 0x06, 0x83, 0x1f, 0x0d, 0x06, 0x17, 0x0d, 0x06, 0x8b, 0x06, 0x83, 0xdf, 0x0d,
0x06, 0x7f, 0x1a, 0xec, 0x5d, 0x37, 0x18, 0xcc, 0x57, 0xd8, 0x5b, 0xac, 0xb0, 0x77, 0xb9, 0xc2,
0xde, 0xa7, 0xcd, 0x91, 0xb0, 0x3a, 0x3b, 0xc9, 0xca, 0x8c, 0x14, 0xe2, 0x74, 0x46, 0xce, 0x0f,
0xc8, 0xe6, 0xb3, 0x9e, 0x6e, 0x9b, 0xdf, 0xc1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xec, 0xd6,
0xbc, 0xfc, 0x4a, 0x03, 0x00, 0x00,
// 483 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x6e, 0xd3, 0x40,
0x18, 0xf6, 0xd5, 0x49, 0xd3, 0x5e, 0xc5, 0x72, 0xa4, 0x60, 0x19, 0xe9, 0x1c, 0x79, 0x21, 0x93,
0x0f, 0x52, 0x09, 0x24, 0xc6, 0x4c, 0x0c, 0x20, 0x15, 0xc3, 0x84, 0x60, 0x70, 0xda, 0xbf, 0xb6,
0x55, 0xdb, 0xe7, 0xfa, 0xee, 0x2a, 0xb1, 0xf1, 0x08, 0x79, 0x0c, 0x1e, 0x80, 0x87, 0xe8, 0x98,
0xb1, 0x62, 0x28, 0xc4, 0x59, 0x18, 0xfb, 0x08, 0xc8, 0x77, 0x76, 0x93, 0x56, 0x74, 0xe8, 0x92,
0xdc, 0xff, 0x7f, 0xdf, 0xff, 0xf9, 0xbb, 0xff, 0x3b, 0xec, 0x96, 0xa7, 0x31, 0xcb, 0x78, 0x5c,
0x56, 0x5c, 0x72, 0x56, 0x46, 0x52, 0x42, 0x55, 0x04, 0xba, 0x22, 0x3b, 0x5d, 0xdf, 0x1d, 0xc6,
0x3c, 0xe6, 0x86, 0xd2, 0x9c, 0x0c, 0xee, 0x7a, 0x31, 0xe7, 0x71, 0x06, 0x4c, 0x57, 0x33, 0x75,
0xc2, 0x64, 0x9a, 0x83, 0x90, 0x51, 0x5e, 0xb6, 0x84, 0x67, 0xb7, 0xc4, 0xbb, 0x43, 0x0b, 0x3e,
0x6e, 0xc0, 0x52, 0x89, 0x44, 0xff, 0x98, 0xa6, 0xff, 0x13, 0xe1, 0xe1, 0x07, 0x05, 0xd5, 0xb7,
0x43, 0xe3, 0x44, 0x84, 0x70, 0xa6, 0x40, 0x48, 0x32, 0xc4, 0xfd, 0xb3, 0xa6, 0xef, 0xa0, 0x11,
0x1a, 0xef, 0x86, 0xa6, 0x20, 0x6f, 0x70, 0x5f, 0xc8, 0xa8, 0x92, 0xce, 0xd6, 0x08, 0x8d, 0xf7,
0x26, 0x6e, 0x60, 0x1c, 0x05, 0x9d, 0xa3, 0xe0, 0x53, 0xe7, 0x68, 0xba, 0x73, 0x71, 0xe5, 0x59,
0xf3, 0xdf, 0x1e, 0x0a, 0xcd, 0x08, 0x79, 0x85, 0x6d, 0x28, 0x8e, 0x1d, 0xfb, 0x01, 0x93, 0xcd,
0x00, 0x21, 0xb8, 0x27, 0x24, 0x94, 0x4e, 0x6f, 0x84, 0xc6, 0x76, 0xa8, 0xcf, 0xfe, 0x5b, 0xbc,
0x7f, 0xc7, 0xb5, 0x28, 0x79, 0x21, 0x80, 0x30, 0xbc, 0x2d, 0xa0, 0x4a, 0x41, 0x38, 0x68, 0x64,
0x8f, 0xf7, 0x26, 0x4f, 0x83, 0x9b, 0x2d, 0xb4, 0xdc, 0x8f, 0x1a, 0x0e, 0x5b, 0x9a, 0xff, 0x05,
0x3f, 0xba, 0x05, 0x10, 0x07, 0x0f, 0xda, 0x54, 0xda, 0xab, 0x77, 0x25, 0x79, 0x89, 0x07, 0x22,
0xca, 0xcb, 0x0c, 0x84, 0xb3, 0x75, 0x9f, 0xb8, 0xc6, 0xc3, 0x8e, 0xe7, 0xcb, 0xb5, 0xba, 0xee,
0x90, 0xf7, 0x78, 0xf7, 0x26, 0x34, 0xad, 0x6f, 0x4f, 0x59, 0x73, 0xdd, 0x5f, 0x57, 0xde, 0xf3,
0x38, 0x95, 0x89, 0x9a, 0x05, 0x47, 0x3c, 0x6f, 0x12, 0xce, 0x41, 0x26, 0xa0, 0x04, 0x3b, 0xe2,
0x79, 0xce, 0x0b, 0x96, 0xf3, 0x63, 0xc8, 0xf4, 0x92, 0xc2, 0xb5, 0x42, 0x93, 0xd2, 0x79, 0x94,
0x29, 0xd0, 0x79, 0xd8, 0xa1, 0x29, 0x26, 0x73, 0x84, 0x07, 0xed, 0x67, 0xc9, 0x6b, 0xdc, 0x3b,
0x54, 0x22, 0x21, 0xfb, 0x1b, 0x5e, 0x95, 0x48, 0xda, 0x98, 0xdd, 0x27, 0x77, 0xdb, 0x66, 0x8f,
0xbe, 0x45, 0xde, 0xe1, 0xbe, 0x5e, 0x31, 0xa1, 0x6b, 0xca, 0xff, 0x5e, 0x8a, 0xeb, 0xdd, 0x8b,
0x77, 0x5a, 0x2f, 0xd0, 0xf4, 0xeb, 0x62, 0x49, 0xad, 0xcb, 0x25, 0xb5, 0xae, 0x97, 0x14, 0x7d,
0xaf, 0x29, 0xfa, 0x51, 0x53, 0x74, 0x51, 0x53, 0xb4, 0xa8, 0x29, 0xfa, 0x53, 0x53, 0xf4, 0xb7,
0xa6, 0xd6, 0x75, 0x4d, 0xd1, 0x7c, 0x45, 0xad, 0xc5, 0x8a, 0x5a, 0x97, 0x2b, 0x6a, 0x7d, 0xde,
0x5c, 0x49, 0x5c, 0x45, 0x27, 0x51, 0x11, 0xb1, 0x8c, 0x9f, 0xa6, 0xec, 0xfc, 0x80, 0x6d, 0x3e,
0xf5, 0xd9, 0xb6, 0xfe, 0x3b, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x4f, 0x5c, 0x50, 0x5e,
0x03, 0x00, 0x00,
}
func (this *QueryPatternsRequest) Equal(that interface{}) bool {
@@ -303,6 +312,9 @@ func (this *QueryPatternsRequest) Equal(that interface{}) bool {
if !this.End.Equal(that1.End) {
return false
}
if this.Step != that1.Step {
return false
}
return true
}
func (this *QueryPatternsResponse) Equal(that interface{}) bool {
@@ -397,11 +409,12 @@ func (this *QueryPatternsRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s := make([]string, 0, 8)
s = append(s, "&logproto.QueryPatternsRequest{")
s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n")
s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n")
s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n")
s = append(s, "Step: "+fmt.Sprintf("%#v", this.Step)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@@ -614,6 +627,11 @@ func (m *QueryPatternsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Step != 0 {
i = encodeVarintPattern(dAtA, i, uint64(m.Step))
i--
dAtA[i] = 0x20
}
n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):])
if err1 != nil {
return 0, err1
@@ -779,6 +797,9 @@ func (m *QueryPatternsRequest) Size() (n int) {
n += 1 + l + sovPattern(uint64(l))
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End)
n += 1 + l + sovPattern(uint64(l))
if m.Step != 0 {
n += 1 + sovPattern(uint64(m.Step))
}
return n
}
@@ -845,6 +866,7 @@ func (this *QueryPatternsRequest) String() string {
`Query:` + fmt.Sprintf("%v", this.Query) + `,`,
`Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Step:` + fmt.Sprintf("%v", this.Step) + `,`,
`}`,
}, "")
return s
@@ -1026,6 +1048,25 @@ func (m *QueryPatternsRequest) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType)
}
m.Step = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPattern
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Step |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipPattern(dAtA[iNdEx:])

View File

@@ -24,6 +24,7 @@ message QueryPatternsRequest {
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
int64 step = 4;
}
message QueryPatternsResponse {

View File

@@ -11,7 +11,7 @@ import (
)
const (
timeResolution = model.Time(int64(time.Second*10) / 1e6)
TimeResolution = model.Time(int64(time.Second*10) / 1e6)
defaultVolumeSize = 500
@@ -25,7 +25,7 @@ type Chunk struct {
}
func newChunk(ts model.Time) Chunk {
maxSize := int(maxChunkTime.Nanoseconds()/timeResolution.UnixNano()) + 1
maxSize := int(maxChunkTime.Nanoseconds()/TimeResolution.UnixNano()) + 1
v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize)}
v.Samples[0] = logproto.PatternSample{
Timestamp: ts,
@@ -43,9 +43,9 @@ func (c Chunk) spaceFor(ts model.Time) bool {
}
// ForRange returns samples with only the values
// in the given range [start:end).
// start and end are in milliseconds since epoch.
func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample {
// in the given range [start:end) and aggregates them by step duration.
// start and end are in milliseconds since epoch. step is a duration in milliseconds.
func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample {
if len(c.Samples) == 0 {
return nil
}
@@ -66,11 +66,36 @@ func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample {
return c.Samples[i].Timestamp >= end
})
}
return c.Samples[lo:hi]
if step == TimeResolution {
return c.Samples[lo:hi]
}
// Re-scale samples into step-sized buckets
currentStep := truncateTimestamp(c.Samples[lo].Timestamp, step)
aggregatedSamples := make([]logproto.PatternSample, 0, ((c.Samples[hi-1].Timestamp-currentStep)/step)+1)
aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{
Timestamp: currentStep,
Value: 0,
})
for _, sample := range c.Samples[lo:hi] {
if sample.Timestamp >= currentStep+step {
stepForSample := truncateTimestamp(sample.Timestamp, step)
for i := currentStep + step; i <= stepForSample; i += step {
aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{
Timestamp: i,
Value: 0,
})
}
currentStep = stepForSample
}
aggregatedSamples[len(aggregatedSamples)-1].Value += sample.Value
}
return aggregatedSamples
}
func (c *Chunks) Add(ts model.Time) {
t := truncateTimestamp(ts)
t := truncateTimestamp(ts, TimeResolution)
if len(*c) == 0 {
*c = append(*c, newChunk(t))
@@ -91,10 +116,10 @@ func (c *Chunks) Add(ts model.Time) {
})
}
func (c Chunks) Iterator(pattern string, from, through model.Time) iter.Iterator {
func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator {
iters := make([]iter.Iterator, 0, len(c))
for _, chunk := range c {
samples := chunk.ForRange(from, through)
samples := chunk.ForRange(from, through, step)
if len(samples) == 0 {
continue
}
@@ -173,4 +198,4 @@ func (c *Chunks) size() int {
return size
}
func truncateTimestamp(ts model.Time) model.Time { return ts - ts%timeResolution }
func truncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step }

View File

@@ -13,24 +13,24 @@ import (
func TestAdd(t *testing.T) {
cks := Chunks{}
cks.Add(timeResolution + 1)
cks.Add(timeResolution + 2)
cks.Add(2*timeResolution + 1)
cks.Add(TimeResolution + 1)
cks.Add(TimeResolution + 2)
cks.Add(2*TimeResolution + 1)
require.Equal(t, 1, len(cks))
require.Equal(t, 2, len(cks[0].Samples))
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + timeResolution + 1)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1)
require.Equal(t, 2, len(cks))
require.Equal(t, 1, len(cks[1].Samples))
}
func TestIterator(t *testing.T) {
cks := Chunks{}
cks.Add(timeResolution + 1)
cks.Add(timeResolution + 2)
cks.Add(2*timeResolution + 1)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + timeResolution + 1)
cks.Add(TimeResolution + 1)
cks.Add(TimeResolution + 2)
cks.Add(2*TimeResolution + 1)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1)
it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()))
it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), TimeResolution)
require.NotNil(t, it)
var samples []logproto.PatternSample
@@ -64,9 +64,9 @@ func TestForRange(t *testing.T) {
{
name: "No Overlap",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 10,
end: 20,
@@ -75,86 +75,120 @@ func TestForRange(t *testing.T) {
{
name: "Complete Overlap",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 0,
end: 10,
expected: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
},
},
{
name: "Partial Overlap",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 2,
end: 4,
expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}},
start: 3,
end: 5,
expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}},
},
{
name: "Single Element in Range",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 3,
end: 4,
expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}},
start: 4,
end: 5,
expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}},
},
{
name: "Start Before First Element",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 0,
end: 4,
end: 5,
expected: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
},
},
{
name: "End After Last Element",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 4,
start: 5,
end: 10,
expected: []logproto.PatternSample{
{Timestamp: 5, Value: 6},
{Timestamp: 6, Value: 6},
},
},
{
name: "Start and End Before First Element",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 3, Value: 4},
{Timestamp: 5, Value: 6},
{Timestamp: 2, Value: 2},
{Timestamp: 4, Value: 4},
{Timestamp: 6, Value: 6},
}},
start: 0,
end: 1,
end: 2,
expected: nil,
},
{
name: "Higher resolution samples down-sampled to preceding step bucket",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 2, Value: 4},
{Timestamp: 3, Value: 6},
{Timestamp: 4, Value: 8},
{Timestamp: 5, Value: 10},
{Timestamp: 6, Value: 12},
}},
start: 1,
end: 6,
expected: []logproto.PatternSample{
{Timestamp: 0, Value: 2},
{Timestamp: 2, Value: 10},
{Timestamp: 4, Value: 18},
{Timestamp: 6, Value: 12},
},
},
{
name: "Low resolution samples insert 0 values for empty steps",
c: &Chunk{Samples: []logproto.PatternSample{
{Timestamp: 1, Value: 2},
{Timestamp: 5, Value: 10},
}},
start: 1,
end: 6,
expected: []logproto.PatternSample{
{Timestamp: 0, Value: 2},
{Timestamp: 2, Value: 0},
{Timestamp: 4, Value: 10},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.c.ForRange(tc.start, tc.end)
result := tc.c.ForRange(tc.start, tc.end, model.Time(2))
if !reflect.DeepEqual(result, tc.expected) {
t.Errorf("Expected %v, got %v", tc.expected, result)
}
require.Equal(t, len(result), cap(result), "Returned slice wasn't created at the correct capacity")
})
}
}

View File

@@ -35,8 +35,8 @@ func (c *LogCluster) merge(samples []*logproto.PatternSample) {
c.Chunks.merge(samples)
}
func (c *LogCluster) Iterator(from, through model.Time) iter.Iterator {
return c.Chunks.Iterator(c.String(), from, through)
func (c *LogCluster) Iterator(from, through, step model.Time) iter.Iterator {
return c.Chunks.Iterator(c.String(), from, through, step)
}
func (c *LogCluster) Samples() []*logproto.PatternSample {

View File

@@ -15,6 +15,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/util"
)
@@ -78,10 +79,14 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
from, through := util.RoundToMilliseconds(req.Start, req.End)
step := model.Time(req.Step)
if step < drain.TimeResolution {
step = drain.TimeResolution
}
var iters []iter.Iterator
err = i.forMatchingStreams(matchers, func(s *stream) error {
iter, err := s.Iterator(ctx, from, through)
iter, err := s.Iterator(ctx, from, through, step)
if err != nil {
return err
}

View File

@@ -54,7 +54,7 @@ func (s *stream) Push(
return nil
}
func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Iterator, error) {
func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) {
// todo we should improve locking.
s.mtx.Lock()
defer s.mtx.Unlock()
@@ -66,7 +66,7 @@ func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Ite
if cluster.String() == "" {
continue
}
iters = append(iters, cluster.Iterator(from, through))
iters = append(iters, cluster.Iterator(from, through, step))
}
return iter.NewMerge(iters...), nil
}

View File

@@ -34,7 +34,7 @@ func TestAddStream(t *testing.T) {
},
})
require.NoError(t, err)
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest)
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second))
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)
@@ -68,7 +68,7 @@ func TestPruneStream(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, false, stream.prune(time.Hour))
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest)
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second))
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)

View File

@@ -984,9 +984,10 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
return req.WithContext(ctx), nil
case *logproto.QueryPatternsRequest:
params := url.Values{
"query": []string{request.GetQuery()},
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"query": []string{request.GetQuery()},
"step": []string{fmt.Sprintf("%d", request.GetStep())},
}
u := &url.URL{