mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
chore: extend query blocking capabilities (#19551)
This commit is contained in:
@@ -34,6 +34,14 @@ overrides:
|
||||
# block any query that matches this query hash
|
||||
- hash: 2943214005 # hash of {stream="stdout",pod="loki-canary-9w49x"}
|
||||
types: filter,limited
|
||||
|
||||
# block queries originating from specific sources via X-Query-Tags
|
||||
# Keys and values are matched case-insensitively.
|
||||
- pattern: '.*' # optional; if pattern and regex are omittied they will default to '.*' and true
|
||||
regex: true
|
||||
tags:
|
||||
source: grafana
|
||||
feature: beta
|
||||
```
|
||||
{{< admonition type="note" >}}
|
||||
Changes to these configurations **do not require a restart**; they are defined in the [runtime configuration file](https://grafana.com/docs/loki/<LOKI_VERSION>/configure/#runtime-configuration-file).
|
||||
@@ -61,6 +69,48 @@ The order of patterns is preserved, so the first matching pattern will be used.
|
||||
|
||||
Blocked queries are logged, as well as counted in the `loki_blocked_queries` metric on a per-tenant basis.
|
||||
|
||||
When a policy matches by pattern/hash/regex, Loki logs whether the query type and request tags matched that policy:
|
||||
|
||||
```logfmt
|
||||
level=warn msg="query blocker matched with regex policy" user=29 type=metric pattern=".*rate\\(.*\\).*" query="sum(rate({app=\"foo\"}[5m]))" typesMatched=true tagsMatched=false blocked=false
|
||||
```
|
||||
|
||||
If tag constraints fail to match, Loki emits a debug log showing the missing key and the raw header value that was received:
|
||||
|
||||
```logfmt
|
||||
level=debug msg="query blocker tags mismatch: missing or mismatched key" key=feature tagsRaw="Source=grafana,Feature=alpha"
|
||||
```
|
||||
|
||||
## Scope
|
||||
|
||||
Queries received via the API and executed as [alerting/recording rules](../../alert/) will be blocked.
|
||||
|
||||
## Tag-based blocking
|
||||
|
||||
You can scope a blocked query rule to requests that include specific key=value pairs in the `X-Query-Tags` header.
|
||||
|
||||
- Header format: `key=value` pairs separated by commas, for example: `Source=grafana,Feature=beta`.
|
||||
- Allowed characters are alphanumeric plus space, comma, equals, '@', '.', and '-'. Any other characters are replaced with `_`.
|
||||
- Parsing keeps only canonical `key=value` tokens; malformed tokens are ignored.
|
||||
- Matching rules:
|
||||
- Keys are matched case-insensitively (the server lowercases keys).
|
||||
- Values are matched case-insensitively.
|
||||
- All specified `tags:` pairs in the rule must be present in the request to apply the block.
|
||||
|
||||
Examples:
|
||||
|
||||
```yaml
|
||||
overrides:
|
||||
tenant-a:
|
||||
blocked_queries:
|
||||
# Block only metric queries from a beta feature flag
|
||||
- types: metric
|
||||
tags:
|
||||
feature: beta
|
||||
|
||||
# Combine with regex to narrow scope further
|
||||
- pattern: '.*rate\\(.*\\).*'
|
||||
regex: true
|
||||
tags:
|
||||
source: grafana
|
||||
```
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/regexp"
|
||||
|
||||
"github.com/grafana/loki/v3/pkg/util"
|
||||
"github.com/grafana/loki/v3/pkg/util/httpreq"
|
||||
logutil "github.com/grafana/loki/v3/pkg/util/log"
|
||||
"github.com/grafana/loki/v3/pkg/util/validation"
|
||||
)
|
||||
@@ -45,8 +46,9 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
|
||||
|
||||
if b.Hash > 0 {
|
||||
if b.Hash == util.HashedQuery(query) {
|
||||
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
|
||||
return qb.block(b, typ, logger)
|
||||
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
|
||||
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
|
||||
return blocked
|
||||
}
|
||||
|
||||
return false
|
||||
@@ -59,8 +61,9 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
|
||||
}
|
||||
|
||||
if strings.TrimSpace(b.Pattern) == strings.TrimSpace(query) {
|
||||
level.Warn(logger).Log("msg", "query blocker matched with exact match policy", "query", query)
|
||||
return qb.block(b, typ, logger)
|
||||
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
|
||||
level.Warn(logger).Log("msg", "query blocker matched with exact match policy", "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
|
||||
return blocked
|
||||
}
|
||||
|
||||
if b.Regex {
|
||||
@@ -71,8 +74,9 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
|
||||
}
|
||||
|
||||
if r.MatchString(query) {
|
||||
level.Warn(logger).Log("msg", "query blocker matched with regex policy", "pattern", b.Pattern, "query", query)
|
||||
return qb.block(b, typ, logger)
|
||||
typesMatched, tagsMatched, blocked := qb.block(ctx, b, typ, logger)
|
||||
level.Warn(logger).Log("msg", "query blocker matched with regex policy", "pattern", b.Pattern, "query", query, "typesMatched", typesMatched, "tagsMatched", tagsMatched, "blocked", blocked)
|
||||
return blocked
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -80,10 +84,12 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (qb *queryBlocker) block(q *validation.BlockedQuery, typ string, logger log.Logger) bool {
|
||||
// no specific types to validate against, so query is blocked
|
||||
func (qb *queryBlocker) block(ctx context.Context, q *validation.BlockedQuery, typ string, logger log.Logger) (bool, bool, bool) {
|
||||
// returns: (typesMatched, tagsMatched, blocked)
|
||||
// no specific types to validate against, so only tags (if any) need to match
|
||||
if len(q.Types) == 0 {
|
||||
return true
|
||||
tagsMatched := qb.tagsMatch(ctx, q, logger)
|
||||
return true, tagsMatched, tagsMatched
|
||||
}
|
||||
|
||||
matched := false
|
||||
@@ -97,8 +103,55 @@ func (qb *queryBlocker) block(q *validation.BlockedQuery, typ string, logger log
|
||||
// query would be blocked, but it didn't match specified types
|
||||
if !matched {
|
||||
level.Debug(logger).Log("msg", "query blocker matched pattern, but not specified types", "pattern", q.Pattern, "regex", q.Regex, "hash", q.Hash, "types", q.Types.String(), "queryType", typ)
|
||||
return false
|
||||
return false, false, false
|
||||
}
|
||||
|
||||
return true
|
||||
// Types matched; ensure tags (if any) also match
|
||||
tagsMatched := qb.tagsMatch(ctx, q, logger)
|
||||
return true, tagsMatched, tagsMatched
|
||||
}
|
||||
|
||||
func (qb *queryBlocker) tagsMatch(ctx context.Context, q *validation.BlockedQuery, logger log.Logger) bool {
|
||||
// if no tags are expected, we treat all queries as matching
|
||||
if len(q.Tags) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
raw := httpreq.ExtractQueryTagsFromContext(ctx)
|
||||
// TagsToKeyValues is expected to always return an even set of key value pairs
|
||||
kvs := httpreq.TagsToKeyValues(raw)
|
||||
|
||||
// Build a lowercased expected map once (size m) and scan kvs once (size n)
|
||||
expected := make(map[string]string, len(q.Tags))
|
||||
for k, v := range q.Tags {
|
||||
expected[strings.ToLower(k)] = v
|
||||
}
|
||||
|
||||
// iterate over the keys in the context and see if they match the expected tags
|
||||
for i := 0; i+1 < len(kvs) && len(expected) > 0; i += 2 {
|
||||
k, okK := kvs[i].(string)
|
||||
v, okV := kvs[i+1].(string)
|
||||
if !okK || !okV {
|
||||
continue
|
||||
}
|
||||
|
||||
keyLower := strings.ToLower(k)
|
||||
if expVal, ok := expected[keyLower]; ok {
|
||||
if strings.EqualFold(v, expVal) {
|
||||
// this key and value match, remove this key from the expected map of tags
|
||||
delete(expected, keyLower)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if all expect tags matched, they would all have been removed from the map
|
||||
// we only block the query if all expected tags matched
|
||||
if len(expected) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for k := range expected {
|
||||
level.Debug(logger).Log("msg", "query blocker tags mismatch: missing or mismatched key", "key", k, "tagsRaw", raw)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/grafana/loki/v3/pkg/logproto"
|
||||
"github.com/grafana/loki/v3/pkg/logqlmodel"
|
||||
"github.com/grafana/loki/v3/pkg/util"
|
||||
"github.com/grafana/loki/v3/pkg/util/httpreq"
|
||||
"github.com/grafana/loki/v3/pkg/util/validation"
|
||||
)
|
||||
|
||||
@@ -160,3 +161,95 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_ExecWithBlockedQueries_Tags(t *testing.T) {
|
||||
limits := &fakeLimits{maxSeries: 10}
|
||||
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), limits, log.NewNopLogger())
|
||||
|
||||
defaultQuery := `topk(1,rate(({app=~"foo|bar"})[1m]))`
|
||||
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
q string
|
||||
tagsHeader string
|
||||
blocked []*validation.BlockedQuery
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "block when tags match and no types",
|
||||
q: defaultQuery,
|
||||
tagsHeader: "Source=grafana,Feature=beta",
|
||||
blocked: []*validation.BlockedQuery{
|
||||
{
|
||||
// no pattern specified -> matches all by default
|
||||
Tags: map[string]string{"source": "grafana", "feature": "beta"},
|
||||
},
|
||||
},
|
||||
expectedErr: logqlmodel.ErrBlocked,
|
||||
},
|
||||
{
|
||||
name: "do not block when tags value mismatches",
|
||||
q: defaultQuery,
|
||||
tagsHeader: "Source=grafana,Feature=alpha",
|
||||
blocked: []*validation.BlockedQuery{
|
||||
{
|
||||
Pattern: ".*",
|
||||
Regex: true,
|
||||
Tags: map[string]string{"feature": "beta"},
|
||||
},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "block when types and tags match",
|
||||
q: defaultQuery,
|
||||
tagsHeader: "Source=grafana,Feature=beta",
|
||||
blocked: []*validation.BlockedQuery{
|
||||
{
|
||||
Pattern: ".*",
|
||||
Regex: true,
|
||||
Types: []string{QueryTypeMetric},
|
||||
Tags: map[string]string{"source": "GRAFANA", "feature": "BETA"}, // case-insensitive
|
||||
},
|
||||
},
|
||||
expectedErr: logqlmodel.ErrBlocked,
|
||||
},
|
||||
{
|
||||
name: "do not block when types match but required tag key missing",
|
||||
q: defaultQuery,
|
||||
tagsHeader: "Source=grafana",
|
||||
blocked: []*validation.BlockedQuery{
|
||||
{
|
||||
Pattern: ".*",
|
||||
Regex: true,
|
||||
Types: []string{QueryTypeMetric},
|
||||
Tags: map[string]string{"feature": "beta"},
|
||||
},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
limits.blockedQueries = test.blocked
|
||||
|
||||
params, err := NewLiteralParams(test.q, time.Unix(0, 0), time.Unix(100000, 0), 60*time.Second, 0, logproto.FORWARD, 1000, nil, nil)
|
||||
require.NoError(t, err)
|
||||
q := eng.Query(params)
|
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake")
|
||||
if test.tagsHeader != "" {
|
||||
ctx = httpreq.InjectQueryTags(ctx, test.tagsHeader)
|
||||
}
|
||||
|
||||
_, err = q.Exec(ctx)
|
||||
|
||||
if test.expectedErr == nil {
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err.Error(), test.expectedErr.Error())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,4 +7,8 @@ type BlockedQuery struct {
|
||||
Regex bool `yaml:"regex"`
|
||||
Hash uint32 `yaml:"hash"`
|
||||
Types flagext.StringSliceCSV `yaml:"types"`
|
||||
// Tags defines a set of key=value constraints that must all match the
|
||||
// incoming request tags (from X-Query-Tags) for this rule to apply.
|
||||
// Keys are case-insensitive; values are matched case-insensitively.
|
||||
Tags map[string]string `yaml:"query_tags"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user