mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
fix(engine): align range-query evaluation timestamps to the step grid (#20904)
Aligns startTs to the nearest epoch-aligned step boundary, to replicate the same behavior we'd have with the old engine. I found this to cause a s`um by(x)(count_over_time({k="v"}[30m))` query to mismatch old vs the new engine.
This commit is contained in:
@@ -39,7 +39,7 @@ func BuildPlanWithDeletes(ctx context.Context, params logql.Params, deletes []*d
|
||||
|
||||
switch e := params.GetExpression().(type) {
|
||||
case syntax.LogSelectorExpr:
|
||||
value, err = buildPlanForLogQuery(ctx, e, params, false, 0, deletes)
|
||||
value, err = buildPlanForLogQuery(ctx, e, params, false, 0, params.Start(), deletes)
|
||||
case syntax.SampleExpr:
|
||||
value, err = buildPlanForSampleQuery(ctx, e, params, deletes)
|
||||
default:
|
||||
@@ -58,15 +58,33 @@ func BuildPlanWithDeletes(ctx context.Context, params logql.Params, deletes []*d
|
||||
return builder.ToPlan()
|
||||
}
|
||||
|
||||
// alignStartToStepGrid floors start to the nearest epoch-aligned step
|
||||
// boundary, matching Prometheus's standard behaviour for range queries.
|
||||
// If step is zero (instant query) the original start is returned unchanged.
|
||||
func alignStartToStepGrid(start time.Time, step time.Duration) time.Time {
|
||||
if step <= 0 {
|
||||
return start
|
||||
}
|
||||
stepNs := step.Nanoseconds()
|
||||
startNs := start.UnixNano()
|
||||
if rem := startNs % stepNs; rem != 0 {
|
||||
return time.Unix(0, startNs-rem)
|
||||
}
|
||||
return start
|
||||
}
|
||||
|
||||
// buildPlanForLogQuery builds logical plan operations by traversing [syntax.LogSelectorExpr]
|
||||
// isMetricQuery should be set to true if this expr is encountered when processing a [syntax.SampleExpr].
|
||||
// rangeInterval should be set to a non-zero value if the query contains [$range].
|
||||
// queryStart is the effective start time for the query; for range queries it
|
||||
// must already be aligned to the step grid via [alignStartToStepGrid].
|
||||
func buildPlanForLogQuery(
|
||||
ctx context.Context,
|
||||
expr syntax.LogSelectorExpr,
|
||||
params logql.Params,
|
||||
isMetricQuery bool,
|
||||
rangeInterval time.Duration,
|
||||
queryStart time.Time,
|
||||
deletes []*deletion.Request,
|
||||
) (Value, error) {
|
||||
var (
|
||||
@@ -149,7 +167,7 @@ func buildPlanForLogQuery(
|
||||
}
|
||||
|
||||
// SELECT -> Filter
|
||||
start := params.Start()
|
||||
start := queryStart
|
||||
end := params.End()
|
||||
// extend search by rangeInterval to be able to include entries belonging to the [$range] interval.
|
||||
for _, value := range convertQueryRangeToPredicates(start.Add(-rangeInterval), end) {
|
||||
@@ -287,8 +305,9 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, wc *walkContext) (Valu
|
||||
}
|
||||
|
||||
rangeInterval := e.Left.Interval
|
||||
alignedStart := alignStartToStepGrid(wc.params.Start(), wc.params.Step())
|
||||
|
||||
logQuery, err := buildPlanForLogQuery(wc.ctx, logSelectorExpr, wc.params, true, rangeInterval, wc.deletes)
|
||||
logQuery, err := buildPlanForLogQuery(wc.ctx, logSelectorExpr, wc.params, true, rangeInterval, alignedStart, wc.deletes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -365,7 +384,7 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, wc *walkContext) (Valu
|
||||
}
|
||||
|
||||
builder = builder.RangeAggregation(
|
||||
convertGrouping(e.Grouping), rangeAggType, wc.params.Start(), wc.params.End(), wc.params.Step(), rangeInterval,
|
||||
convertGrouping(e.Grouping), rangeAggType, alignedStart, wc.params.End(), wc.params.Step(), rangeInterval,
|
||||
)
|
||||
|
||||
switch e.Operation {
|
||||
|
||||
@@ -1087,6 +1087,77 @@ RETURN %22
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlignStartToStepGrid(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
start time.Time
|
||||
step time.Duration
|
||||
want time.Time
|
||||
}{
|
||||
{
|
||||
name: "zero step returns original start (instant query)",
|
||||
start: time.Unix(50, 0),
|
||||
step: 0,
|
||||
want: time.Unix(50, 0),
|
||||
},
|
||||
{
|
||||
name: "already aligned start is unchanged",
|
||||
start: time.Unix(100, 0),
|
||||
step: 100 * time.Second,
|
||||
want: time.Unix(100, 0),
|
||||
},
|
||||
{
|
||||
name: "non-aligned start is floored to step boundary",
|
||||
start: time.Unix(50, 0),
|
||||
step: 100 * time.Second,
|
||||
want: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
name: "start just past a step boundary",
|
||||
start: time.Unix(101, 0),
|
||||
step: 100 * time.Second,
|
||||
want: time.Unix(100, 0),
|
||||
},
|
||||
{
|
||||
name: "sub-second step alignment",
|
||||
start: time.Unix(0, 750_000_000), // 750ms
|
||||
step: 500 * time.Millisecond,
|
||||
want: time.Unix(0, 500_000_000), // 500ms
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := alignStartToStepGrid(tc.start, tc.step)
|
||||
require.Equal(t, tc.want.UnixNano(), got.UnixNano())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestRangeAggregationStepAlignment verifies that the logical planner aligns
|
||||
// the start timestamp of a RangeAggregation node to the step grid, matching
|
||||
// Prometheus's standard behaviour for range queries.
|
||||
func TestRangeAggregationStepAlignment(t *testing.T) {
|
||||
// start=50s, step=100s → aligned start = 0s (floor(50/100)*100)
|
||||
q := &query{
|
||||
statement: `count_over_time({app="test"}[100s])`,
|
||||
start: 50,
|
||||
end: 250,
|
||||
step: 100 * time.Second,
|
||||
direction: logproto.BACKWARD,
|
||||
limit: 1000,
|
||||
}
|
||||
plan, err := BuildPlan(context.Background(), q)
|
||||
require.NoError(t, err)
|
||||
|
||||
planStr := plan.String()
|
||||
// The RANGE_AGGREGATION node must show the aligned start (epoch 0 = 1970-01-01T00:00:00Z),
|
||||
// not the raw start (50s = 1970-01-01T00:00:50Z).
|
||||
require.Contains(t, planStr, "start_ts=1970-01-01T00:00:00Z",
|
||||
"RangeAggregation start should be aligned to step grid; plan:\n%s", planStr)
|
||||
require.Contains(t, planStr, "step=1m40s",
|
||||
"step should be preserved in the plan; plan:\n%s", planStr)
|
||||
}
|
||||
|
||||
// Helper to build a plan from predicates and return the SSA string
|
||||
func buildPlanFromPredicates(t *testing.T, predicates []Value, selector Value) string {
|
||||
t.Helper()
|
||||
|
||||
Reference in New Issue
Block a user