chore(engine): enforce retention at query time (#20497)

This commit is contained in:
Ashwanth
2026-01-22 11:11:57 +05:30
committed by GitHub
parent 805b8020df
commit 8a08023602
6 changed files with 499 additions and 15 deletions

View File

@@ -361,6 +361,12 @@ query_engine:
# CLI flag: -query-engine.storage-start-date
[storage_start_date: <time> | default = 0]
# Lifecycle of data objects in days. If set, queries falling outside of the
# retention period will not be supported. When both storage-start-date and
# storage-retention-days are set, the more restrictive of the two will apply.
# CLI flag: -query-engine.storage-retention-days
[storage_retention_days: <int> | default = 0]
# Enable routing of query splits in the query frontend to the next generation
# engine when they fall within the configured time range.
# CLI flag: -query-engine.enable-engine-router
@@ -377,6 +383,11 @@ query_engine:
# CLI flag: -query-engine.enable-delete-req-filtering
[enable_delete_req_filtering: <boolean> | default = true]
# Enforce tenant retention limits. Queries falling outside tenant's retention
# period are either adjusted or rejected.
# CLI flag: -query-engine.enforce-retention-period
[enforce_retention_period: <boolean> | default = false]
# The query_scheduler block configures the Loki query scheduler. When configured
# it separates the tenant query queues from the query-frontend.
[query_scheduler: <query_scheduler>]

View File

@@ -28,13 +28,14 @@ type Config struct {
Executor ExecutorConfig `yaml:",inline"`
Worker WorkerConfig `yaml:",inline"`
StorageLag time.Duration `yaml:"storage_lag" category:"experimental"`
StorageStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"`
StorageLag time.Duration `yaml:"storage_lag" category:"experimental"`
StorageStartDate flagext.Time `yaml:"storage_start_date" category:"experimental"`
StorageRetentionDays int64 `yaml:"storage_retention_days" category:"experimental"`
EnableEngineRouter bool `yaml:"enable_engine_router" category:"experimental"`
DownstreamAddress string `yaml:"downstream_address" category:"experimental"`
EnableDeleteReqFiltering bool `yaml:"enable_delete_req_filtering" category:"experimental"`
EnableEngineRouter bool `yaml:"enable_engine_router" category:"experimental"`
DownstreamAddress string `yaml:"downstream_address" category:"experimental"`
EnableDeleteReqFiltering bool `yaml:"enable_delete_req_filtering" category:"experimental"`
EnforceRetentionPeriod bool `yaml:"enforce_retention_period" category:"experimental"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -53,15 +54,30 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.StorageLag, prefix+"storage-lag", 1*time.Hour, "Amount of time until data objects are available.")
f.Var(&cfg.StorageStartDate, prefix+"storage-start-date", "Initial date when data objects became available. Format YYYY-MM-DD. If not set, assume data objects are always available no matter how far back.")
f.Int64Var(&cfg.StorageRetentionDays, prefix+"storage-retention-days", 0, "Lifecycle of data objects in days. If set, queries falling outside of the retention period will not be supported. When both storage-start-date and storage-retention-days are set, the more restrictive of the two will apply.")
f.BoolVar(&cfg.EnableEngineRouter, prefix+"enable-engine-router", false, "Enable routing of query splits in the query frontend to the next generation engine when they fall within the configured time range.")
f.StringVar(&cfg.DownstreamAddress, prefix+"downstream-address", "", "Downstream address to send query splits to. This is the HTTP handler address of the query engine scheduler.")
f.BoolVar(&cfg.EnforceRetentionPeriod, prefix+"enforce-retention-period", false, "Enforce tenant retention limits. Queries falling outside tenant's retention period are either adjusted or rejected.")
f.BoolVar(&cfg.EnableDeleteReqFiltering, prefix+"enable-delete-req-filtering", true, "When enabled, query results exclude log lines that match overlapping delete requests (not just pending requests). Disable to return all logs without considering delete requests.")
}
func (cfg *Config) ValidQueryRange() (time.Time, time.Time) {
return time.Time(cfg.StorageStartDate).UTC(), time.Now().UTC().Add(-cfg.StorageLag)
startDate := time.Time(cfg.StorageStartDate).UTC()
now := time.Now().UTC()
if cfg.StorageRetentionDays > 0 {
// considering start of the day for retention calculations.
// e.g. if retention is 7 days, and today is 10th, data before 3rd is not available.
retentionBoundary := now.Truncate(24 * time.Hour).
Add(-time.Duration(cfg.StorageRetentionDays) * 24 * time.Hour)
if startDate.IsZero() || retentionBoundary.After(startDate) {
startDate = retentionBoundary
}
}
return startDate, now.Add(-cfg.StorageLag)
}
// AdvertiseAddress determines the TCP address to advertise for accepting

View File

@@ -10,10 +10,13 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
@@ -21,9 +24,14 @@ import (
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
)
type Limits interface {
querier_limits.Limits
RetentionLimits
}
// Handler returns an [http.Handler] for serving queries. Unsupported queries
// will result in an error.
func Handler(cfg Config, logger log.Logger, engine *Engine, limits querier_limits.Limits) http.Handler {
func Handler(cfg Config, logger log.Logger, engine *Engine, limits Limits) http.Handler {
return executorHandler(cfg, logger, engine, limits)
}
@@ -32,21 +40,27 @@ type queryExecutor interface {
Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)
}
func executorHandler(cfg Config, logger log.Logger, exec queryExecutor, limits querier_limits.Limits) http.Handler {
func executorHandler(cfg Config, logger log.Logger, exec queryExecutor, limits Limits) http.Handler {
h := &queryHandler{
cfg: cfg,
logger: logger,
exec: exec,
limits: limits,
}
if cfg.EnforceRetentionPeriod {
h.retentionChecker = newRetentionChecker(limits, logger)
}
return queryrange.NewSerializeHTTPHandler(h, queryrange.DefaultCodec)
}
type queryHandler struct {
cfg Config
logger log.Logger
exec queryExecutor
limits querier_limits.Limits
cfg Config
logger log.Logger
exec queryExecutor
limits querier_limits.Limits
retentionChecker *retentionChecker
}
var _ queryrangebase.Handler = (*queryHandler)(nil)
@@ -123,6 +137,20 @@ func (h *queryHandler) execute(ctx context.Context, logger log.Logger, params lo
return logqlmodel.Result{}, httpgrpc.Error(http.StatusNotImplemented, err.Error())
}
if h.retentionChecker != nil {
checkResult := h.retentionChecker.Validate(ctx, params)
if checkResult.Error != nil {
return logqlmodel.Result{}, checkResult.Error
}
if checkResult.EmptyResponse {
return emptyResult(ctx, params)
}
// continue with adjusted params
params = checkResult.Params
}
res, err := h.exec.Execute(ctx, params)
if err != nil && errors.Is(err, ErrNotSupported) {
level.Warn(logger).Log("msg", "unsupported query", "err", err)
@@ -169,3 +197,28 @@ func (h *queryHandler) doInstantRequest(ctx context.Context, req *queryrange.Lok
}
return h.execute(ctx, logger, params)
}
func emptyResult(ctx context.Context, params logql.Params) (logqlmodel.Result, error) {
var data parser.Value
switch params.GetExpression().(type) {
case syntax.SampleExpr:
if params.Step() > 0 {
data = promql.Matrix{}
} else {
data = promql.Vector{}
}
case syntax.LogSelectorExpr:
data = logqlmodel.Streams{}
default:
return logqlmodel.Result{}, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported expression type %T", params.GetExpression())
}
md := metadata.FromContext(ctx)
md.AddWarning("Query was executed using the new experimental query engine.")
return logqlmodel.Result{
Data: data,
Headers: md.Headers(),
Warnings: md.Warnings(),
}, nil
}

View File

@@ -43,6 +43,11 @@ func newTestHandler(cfg Config, exec queryExecutor, limits querier_limits.Limits
}
}
type mockLimits struct {
querier_limits.Limits
RetentionLimits
}
func TestHandler(t *testing.T) {
cfg := Config{
Executor: ExecutorConfig{
@@ -52,7 +57,7 @@ func TestHandler(t *testing.T) {
}
logger := log.NewNopLogger()
eng := &mockEngine{}
limits := &querytest.MockLimits{}
limits := &mockLimits{}
handler := executorHandler(cfg, logger, eng, limits)
require.NotNil(t, handler)

218
pkg/engine/retention.go Normal file
View File

@@ -0,0 +1,218 @@
package engine
import (
"context"
"net/http"
"time"
"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/validation"
)
// RetentionLimits provides access to tenant retention settings.
type RetentionLimits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
}
// retentionChecker checks the incoming query against tenant's retention settings
// and decides how to handle it. This is a temporary solution until storage
// supports retention directly.
//
// The following section outlines how to handle retention period:
//
// 1. Only global retention_period configured (no stream retention):
//
// retentionBoundary = now - retentionPeriod (data older than this is expired)
//
// ┌─────────────────────────────────────┬──────────────────────────────────────────────────────────────┐
// │ query entirely within retention │ Execute as-is │
// │ (queryStart >= retentionBoundary) │ │
// ├─────────────────────────────────────┼──────────────────────────────────────────────────────────────┤
// │ query overlaps retention boundary │ Snap query start to retention boundary to avoid reading │
// │ (queryStart < retentionBoundary │ data out of retention │
// │ AND queryEnd > retentionBoundary) │ │
// ├─────────────────────────────────────┼──────────────────────────────────────────────────────────────┤
// │ query entirely out of retention │ Entirely out of retention. Return empty response │
// │ (queryEnd <= retentionBoundary) │ │
// └─────────────────────────────────────┴──────────────────────────────────────────────────────────────┘
//
// 2. Both global retention_period and per-stream retention configured:
//
// globalBoundary = now - globalRetentionPeriod
// streamBoundary = now - smallestStreamRetentionPeriod (most restrictive)
//
// ┌─────────────────────────────────────┬──────────────────────────────────────────────────────────────┐
// │ query entirely within retention │ Execute as-is │
// │ (queryStart >= globalBoundary AND │ │
// │ queryStart >= streamBoundary) │ │
// ├─────────────────────────────────────┼──────────────────────────────────────────────────────────────┤
// │ query overlaps either boundary │ Return 501 Not Implemented error │
// │ (queryStart < globalBoundary OR │ │
// │ queryStart < streamBoundary) │ │
// └─────────────────────────────────────┴──────────────────────────────────────────────────────────────┘
type retentionChecker struct {
limits RetentionLimits
logger log.Logger
clock quartz.Clock
}
// newRetentionChecker creates a retention checker to validate queries against retention limits.
func newRetentionChecker(limits RetentionLimits, logger log.Logger) *retentionChecker {
return &retentionChecker{
limits: limits,
logger: logger,
clock: quartz.NewReal(),
}
}
// RetentionCheckResult represents the result of a retention check.
type RetentionCheckResult struct {
// If the query start was snapped to the retention boundary, this will
// contain adjusted params. Otherwise, it returns the original params.
Params logql.Params
// EmptyResponse indicates the query should return an empty response
// because the entire range is out of retention.
EmptyResponse bool
// Error is set if the query cannot be executed due to retention constraints.
// This is used when stream retention makes the situation too complex.
Error error
}
// Validate determines how a query should be executed based on retention limits.
// It returns a result containing potentially adjusted params, or an error/empty indicator.
func (r *retentionChecker) Validate(ctx context.Context, params logql.Params) RetentionCheckResult {
if r.limits == nil {
return RetentionCheckResult{Params: params}
}
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return RetentionCheckResult{
Error: httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()),
}
}
var (
queryStart = params.Start()
queryEnd = params.End()
// Calculate retention boundary at day granularity.
today = r.clock.Now().UTC().Truncate(24 * time.Hour)
globalRetention = r.limits.RetentionPeriod(tenantID)
streamRetention = r.limits.StreamRetention(tenantID)
)
// If no retention is configured, execute as-is.
if globalRetention <= 0 && len(streamRetention) == 0 {
return RetentionCheckResult{Params: params}
}
globalBoundary := today.Add(-globalRetention)
if len(streamRetention) > 0 {
return r.handleStreamRetention(params, tenantID, queryStart, globalRetention, globalBoundary, streamRetention, today)
}
// Only global retention configured.
return r.handleGlobalRetention(params, tenantID, queryStart, queryEnd, globalRetention, globalBoundary)
}
// handleGlobalRetention handles the case where only global retention is configured.
func (r *retentionChecker) handleGlobalRetention(params logql.Params, tenantID string, queryStart, queryEnd time.Time, globalRetention time.Duration, globalBoundary time.Time) RetentionCheckResult {
if globalRetention <= 0 {
return RetentionCheckResult{Params: params}
}
// If query is entirely within retention, execute as-is.
if !queryStart.Before(globalBoundary) {
return RetentionCheckResult{Params: params}
}
// If query is entirely out of retention (query end is before or at boundary),
// return empty response.
if !queryEnd.After(globalBoundary) {
level.Info(r.logger).Log(
"msg", "query entirely out of retention, return empty result",
"tenant", tenantID,
"query_end", queryEnd,
"retention_boundary", globalBoundary,
)
return RetentionCheckResult{EmptyResponse: true}
}
// Query overlaps with retention boundary - snap start to boundary.
level.Info(r.logger).Log(
"msg", "snapping query start to retention boundary",
"tenant", tenantID,
"original_start", queryStart,
"adjusted_start", globalBoundary,
)
return RetentionCheckResult{
Params: &adjustedParams{Params: params, adjustedStart: globalBoundary},
}
}
// handleStreamRetention handles the case where stream retention is also configured.
func (r *retentionChecker) handleStreamRetention(params logql.Params, tenantID string, queryStart time.Time, globalRetention time.Duration, globalBoundary time.Time, streamRetention []validation.StreamRetention, today time.Time) RetentionCheckResult {
// Find the smallest stream retention period (most restrictive).
var smallestStreamRetention time.Duration
for _, sr := range streamRetention {
period := time.Duration(sr.Period)
if smallestStreamRetention == 0 || period < smallestStreamRetention {
smallestStreamRetention = period
}
}
streamBoundary := today.Add(-smallestStreamRetention)
// If either global or the most restrictive stream retention boundary is after query start,
// we chose to not handle this - return unimplemented.
if globalRetention > 0 && globalBoundary.After(queryStart) {
level.Info(r.logger).Log(
"msg", "query overlaps global retention boundary with stream retention configured, returning unimplemented",
"tenant", tenantID,
"query_start", queryStart,
"global_boundary", globalBoundary,
"stream_boundary", streamBoundary,
)
return RetentionCheckResult{
Error: httpgrpc.Errorf(http.StatusNotImplemented,
"query overlaps retention boundary and stream retention is configured - not supported by v2 engine"),
}
}
if smallestStreamRetention > 0 && streamBoundary.After(queryStart) {
level.Info(r.logger).Log(
"msg", "query overlaps stream retention boundary, returning unimplemented",
"tenant", tenantID,
"query_start", queryStart,
"global_boundary", globalBoundary,
"stream_boundary", streamBoundary,
)
return RetentionCheckResult{
Error: httpgrpc.Errorf(http.StatusNotImplemented,
"query overlaps retention boundary and stream retention is configured - not supported by v2 engine"),
}
}
// Both boundaries are before query start, so query is entirely within retention.
return RetentionCheckResult{Params: params}
}
// adjustedParams wraps logql.Params with an adjusted start time.
type adjustedParams struct {
logql.Params
adjustedStart time.Time
}
func (a *adjustedParams) Start() time.Time {
return a.adjustedStart
}

View File

@@ -0,0 +1,181 @@
package engine
import (
"net/http"
"testing"
"time"
"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/validation"
)
type fakeRetentionLimits struct {
retentionPeriods map[string]time.Duration
streamRetentions map[string][]validation.StreamRetention
}
func (f fakeRetentionLimits) RetentionPeriod(userID string) time.Duration {
return f.retentionPeriods[userID]
}
func (f fakeRetentionLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.streamRetentions[userID]
}
type fakeParams struct {
start time.Time
end time.Time
}
func (f fakeParams) QueryString() string { return "" }
func (f fakeParams) Start() time.Time { return f.start }
func (f fakeParams) End() time.Time { return f.end }
func (f fakeParams) Step() time.Duration { return 0 }
func (f fakeParams) Interval() time.Duration { return 0 }
func (f fakeParams) Limit() uint32 { return 0 }
func (f fakeParams) Direction() logproto.Direction { return logproto.BACKWARD }
func (f fakeParams) Shards() []string { return nil }
func (f fakeParams) GetExpression() syntax.Expr { return nil }
func (f fakeParams) GetStoreChunks() *logproto.ChunkRefGroup { return nil }
func (f fakeParams) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}
func TestRetentionChecker(t *testing.T) {
now := time.Date(2026, 1, 19, 06, 15, 30, 0, time.UTC)
today := now.Truncate(24 * time.Hour)
globalRetention := 30 * 24 * time.Hour // 30 days
globalBoundary := today.Add(-globalRetention)
tests := []struct {
name string
queryStart time.Time
queryEnd time.Time
streamRetention []validation.StreamRetention
expectEmptyResponse bool
expectError bool
expectErrorCode int
expectAdjustedStart time.Time
}{
{
name: "global only: query entirely within retention",
queryStart: now.Add(-7 * 24 * time.Hour),
queryEnd: now.Add(-1 * time.Hour),
expectAdjustedStart: now.Add(-7 * 24 * time.Hour), // no adjustment
},
{
name: "global only: query entirely out of retention",
queryStart: now.Add(-60 * 24 * time.Hour),
queryEnd: now.Add(-45 * 24 * time.Hour),
expectEmptyResponse: true,
},
{
name: "global only: query overlaps retention boundary",
queryStart: now.Add(-45 * 24 * time.Hour),
queryEnd: now.Add(-1 * time.Hour),
expectAdjustedStart: globalBoundary, // snapped to boundary
},
{
name: "global only: query start exactly at boundary",
queryStart: globalBoundary,
queryEnd: now.Add(-1 * time.Hour),
expectAdjustedStart: globalBoundary,
},
// Stream retention cases (with global retention)
{
name: "stream+global: query within both retentions",
queryStart: now.Add(-10 * 24 * time.Hour), // 10 days
queryEnd: now.Add(-1 * time.Hour),
streamRetention: []validation.StreamRetention{
{Period: model.Duration(20 * 24 * time.Hour)}, // 20 days
{Period: model.Duration(60 * 24 * time.Hour)}, // 60 days
},
expectAdjustedStart: now.Add(-10 * 24 * time.Hour), // no adjustment
},
{
name: "stream+global: query overlaps retention boundary",
queryStart: now.Add(-35 * 24 * time.Hour),
queryEnd: now.Add(-1 * time.Hour),
streamRetention: []validation.StreamRetention{
{Period: model.Duration(20 * 24 * time.Hour)}, // 20 days
},
expectError: true,
expectErrorCode: http.StatusNotImplemented,
},
{
name: "stream+global: multiple stream retentions - uses smallest",
queryStart: now.Add(-15 * 24 * time.Hour),
queryEnd: now.Add(-1 * time.Hour),
streamRetention: []validation.StreamRetention{
{Period: model.Duration(40 * 24 * time.Hour)},
{Period: model.Duration(10 * 24 * time.Hour)}, // most restrictive
},
expectError: true,
expectErrorCode: http.StatusNotImplemented,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clock := quartz.NewMock(t)
clock.Set(now)
limits := fakeRetentionLimits{
retentionPeriods: map[string]time.Duration{
"test-tenant": globalRetention,
},
streamRetentions: map[string][]validation.StreamRetention{
"test-tenant": tt.streamRetention,
},
}
rc := &retentionChecker{
limits: limits,
logger: log.NewNopLogger(),
clock: clock,
}
ctx := user.InjectOrgID(t.Context(), "test-tenant")
params := fakeParams{
start: tt.queryStart,
end: tt.queryEnd,
}
result := rc.Validate(ctx, params)
// Check error case
if tt.expectError {
require.NotNil(t, result.Error)
if tt.expectErrorCode > 0 {
resp, ok := httpgrpc.HTTPResponseFromError(result.Error)
require.True(t, ok)
require.Equal(t, int32(tt.expectErrorCode), resp.Code)
}
return
}
require.Nil(t, result.Error)
if tt.expectEmptyResponse {
require.True(t, result.EmptyResponse)
return
}
require.NotNil(t, result.Params)
require.Equal(t, tt.expectAdjustedStart, result.Params.Start())
require.Equal(t, tt.queryEnd, result.Params.End())
})
}
}