mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
feat: Introduce special header that tells Loki not to modify query results (#12327)
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//go:build integration
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
||||
@@ -91,6 +91,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC
|
||||
var unaryInterceptors []grpc.UnaryClientInterceptor
|
||||
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
|
||||
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
|
||||
unaryInterceptors = append(unaryInterceptors, server.UnaryClientHTTPHeadersInterceptor)
|
||||
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
|
||||
if !cfg.Internal {
|
||||
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
|
||||
@@ -100,6 +101,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC
|
||||
var streamInterceptors []grpc.StreamClientInterceptor
|
||||
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
|
||||
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
|
||||
streamInterceptors = append(streamInterceptors, server.StreamClientHTTPHeadersInterceptor)
|
||||
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
|
||||
if !cfg.Internal {
|
||||
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/grafana/dskit/httpgrpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
@@ -436,7 +438,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if i.pipelineWrapper != nil {
|
||||
if i.pipelineWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
|
||||
userID, err := tenant.TenantID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -490,7 +492,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if i.extractorWrapper != nil {
|
||||
if i.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
|
||||
userID, err := tenant.TenantID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
|
||||
"github.com/grafana/dskit/tenant"
|
||||
"github.com/grafana/dskit/user"
|
||||
|
||||
@@ -717,6 +719,47 @@ func Test_PipelineWrapper(t *testing.T) {
|
||||
require.Equal(t, 10, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
func Test_PipelineWrapper_disabled(t *testing.T) {
|
||||
instance := defaultInstance(t)
|
||||
|
||||
wrapper := &testPipelineWrapper{
|
||||
pipeline: newMockPipeline(),
|
||||
}
|
||||
instance.pipelineWrapper = wrapper
|
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "test-user")
|
||||
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
|
||||
_, err := tenant.TenantID(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
it, err := instance.Query(ctx,
|
||||
logql.SelectLogParams{
|
||||
QueryRequest: &logproto.QueryRequest{
|
||||
Selector: `{job="3"}`,
|
||||
Limit: uint32(2),
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(0, 100000000),
|
||||
Direction: logproto.BACKWARD,
|
||||
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
|
||||
Plan: &plan.QueryPlan{
|
||||
AST: syntax.MustParseExpr(`{job="3"}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer it.Close()
|
||||
|
||||
for it.Next() {
|
||||
// Consume the iterator
|
||||
require.NoError(t, it.Error())
|
||||
}
|
||||
|
||||
require.Equal(t, "", wrapper.tenant)
|
||||
require.Equal(t, ``, wrapper.query)
|
||||
require.Equal(t, 0, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
type testPipelineWrapper struct {
|
||||
query string
|
||||
tenant string
|
||||
@@ -807,6 +850,41 @@ func Test_ExtractorWrapper(t *testing.T) {
|
||||
require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
func Test_ExtractorWrapper_disabled(t *testing.T) {
|
||||
instance := defaultInstance(t)
|
||||
|
||||
wrapper := &testExtractorWrapper{
|
||||
extractor: newMockExtractor(),
|
||||
}
|
||||
instance.extractorWrapper = wrapper
|
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "test-user")
|
||||
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
|
||||
it, err := instance.QuerySample(ctx,
|
||||
logql.SelectSampleParams{
|
||||
SampleQueryRequest: &logproto.SampleQueryRequest{
|
||||
Selector: `sum(count_over_time({job="3"}[1m]))`,
|
||||
Start: time.Unix(0, 0),
|
||||
End: time.Unix(0, 100000000),
|
||||
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
|
||||
Plan: &plan.QueryPlan{
|
||||
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer it.Close()
|
||||
|
||||
for it.Next() {
|
||||
// Consume the iterator
|
||||
require.NoError(t, it.Error())
|
||||
}
|
||||
|
||||
require.Equal(t, ``, wrapper.query)
|
||||
require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
type testExtractorWrapper struct {
|
||||
query string
|
||||
tenant string
|
||||
|
||||
@@ -607,7 +607,7 @@ func (t *Loki) setupModuleManager() error {
|
||||
mm.RegisterModule(Querier, t.initQuerier)
|
||||
mm.RegisterModule(Ingester, t.initIngester)
|
||||
mm.RegisterModule(IngesterQuerier, t.initIngesterQuerier)
|
||||
mm.RegisterModule(IngesterQueryTagsInterceptors, t.initIngesterQueryTagsInterceptors, modules.UserInvisibleModule)
|
||||
mm.RegisterModule(IngesterGRPCInterceptors, t.initIngesterGRPCInterceptors, modules.UserInvisibleModule)
|
||||
mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendMiddleware, modules.UserInvisibleModule)
|
||||
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
|
||||
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
|
||||
@@ -714,7 +714,7 @@ func (t *Loki) setupModuleManager() error {
|
||||
|
||||
// Initialise query tags interceptors on targets running ingester
|
||||
if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) {
|
||||
deps[Server] = append(deps[Server], IngesterQueryTagsInterceptors)
|
||||
deps[Server] = append(deps[Server], IngesterGRPCInterceptors)
|
||||
}
|
||||
|
||||
// Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled.
|
||||
|
||||
@@ -86,47 +86,47 @@ const maxChunkAgeForTableManager = 12 * time.Hour
|
||||
|
||||
// The various modules that make up Loki.
|
||||
const (
|
||||
Ring string = "ring"
|
||||
RuntimeConfig string = "runtime-config"
|
||||
Overrides string = "overrides"
|
||||
OverridesExporter string = "overrides-exporter"
|
||||
TenantConfigs string = "tenant-configs"
|
||||
Server string = "server"
|
||||
InternalServer string = "internal-server"
|
||||
Distributor string = "distributor"
|
||||
Querier string = "querier"
|
||||
CacheGenerationLoader string = "cache-generation-loader"
|
||||
Ingester string = "ingester"
|
||||
IngesterQuerier string = "ingester-querier"
|
||||
IngesterQueryTagsInterceptors string = "ingester-query-tags-interceptors"
|
||||
QueryFrontend string = "query-frontend"
|
||||
QueryFrontendTripperware string = "query-frontend-tripperware"
|
||||
QueryLimiter string = "query-limiter"
|
||||
QueryLimitsInterceptors string = "query-limits-interceptors"
|
||||
QueryLimitsTripperware string = "query-limits-tripper"
|
||||
RulerStorage string = "ruler-storage"
|
||||
Ruler string = "ruler"
|
||||
RuleEvaluator string = "rule-evaluator"
|
||||
Store string = "store"
|
||||
TableManager string = "table-manager"
|
||||
MemberlistKV string = "memberlist-kv"
|
||||
Compactor string = "compactor"
|
||||
BloomGateway string = "bloom-gateway"
|
||||
BloomGatewayRing string = "bloom-gateway-ring"
|
||||
IndexGateway string = "index-gateway"
|
||||
IndexGatewayRing string = "index-gateway-ring"
|
||||
IndexGatewayInterceptors string = "index-gateway-interceptors"
|
||||
QueryScheduler string = "query-scheduler"
|
||||
QuerySchedulerRing string = "query-scheduler-ring"
|
||||
BloomCompactor string = "bloom-compactor"
|
||||
BloomCompactorRing string = "bloom-compactor-ring"
|
||||
BloomStore string = "bloom-store"
|
||||
All string = "all"
|
||||
Read string = "read"
|
||||
Write string = "write"
|
||||
Backend string = "backend"
|
||||
Analytics string = "analytics"
|
||||
InitCodec string = "init-codec"
|
||||
Ring string = "ring"
|
||||
RuntimeConfig string = "runtime-config"
|
||||
Overrides string = "overrides"
|
||||
OverridesExporter string = "overrides-exporter"
|
||||
TenantConfigs string = "tenant-configs"
|
||||
Server string = "server"
|
||||
InternalServer string = "internal-server"
|
||||
Distributor string = "distributor"
|
||||
Querier string = "querier"
|
||||
CacheGenerationLoader string = "cache-generation-loader"
|
||||
Ingester string = "ingester"
|
||||
IngesterQuerier string = "ingester-querier"
|
||||
IngesterGRPCInterceptors string = "ingester-query-tags-interceptors"
|
||||
QueryFrontend string = "query-frontend"
|
||||
QueryFrontendTripperware string = "query-frontend-tripperware"
|
||||
QueryLimiter string = "query-limiter"
|
||||
QueryLimitsInterceptors string = "query-limits-interceptors"
|
||||
QueryLimitsTripperware string = "query-limits-tripper"
|
||||
RulerStorage string = "ruler-storage"
|
||||
Ruler string = "ruler"
|
||||
RuleEvaluator string = "rule-evaluator"
|
||||
Store string = "store"
|
||||
TableManager string = "table-manager"
|
||||
MemberlistKV string = "memberlist-kv"
|
||||
Compactor string = "compactor"
|
||||
BloomGateway string = "bloom-gateway"
|
||||
BloomGatewayRing string = "bloom-gateway-ring"
|
||||
IndexGateway string = "index-gateway"
|
||||
IndexGatewayRing string = "index-gateway-ring"
|
||||
IndexGatewayInterceptors string = "index-gateway-interceptors"
|
||||
QueryScheduler string = "query-scheduler"
|
||||
QuerySchedulerRing string = "query-scheduler-ring"
|
||||
BloomCompactor string = "bloom-compactor"
|
||||
BloomCompactorRing string = "bloom-compactor-ring"
|
||||
BloomStore string = "bloom-store"
|
||||
All string = "all"
|
||||
Read string = "read"
|
||||
Write string = "write"
|
||||
Backend string = "backend"
|
||||
Analytics string = "analytics"
|
||||
InitCodec string = "init-codec"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -404,7 +404,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
|
||||
toMerge := []middleware.Interface{
|
||||
httpreq.ExtractQueryMetricsMiddleware(),
|
||||
httpreq.ExtractQueryTagsMiddleware(),
|
||||
httpreq.PropagateHeadersMiddleware(httpreq.LokiEncodingFlagsHeader),
|
||||
httpreq.PropagateHeadersMiddleware(httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
|
||||
serverutil.RecoveryHTTPMiddleware,
|
||||
t.HTTPAuthMiddleware,
|
||||
serverutil.NewPrepopulateMiddleware(),
|
||||
@@ -983,7 +983,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
|
||||
|
||||
toMerge := []middleware.Interface{
|
||||
httpreq.ExtractQueryTagsMiddleware(),
|
||||
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader),
|
||||
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
|
||||
serverutil.RecoveryHTTPMiddleware,
|
||||
t.HTTPAuthMiddleware,
|
||||
queryrange.StatsHTTPMiddleware,
|
||||
@@ -1573,10 +1573,19 @@ func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *Loki) initIngesterQueryTagsInterceptors() (services.Service, error) {
|
||||
func (t *Loki) initIngesterGRPCInterceptors() (services.Service, error) {
|
||||
_ = level.Debug(util_log.Logger).Log("msg", "initializing ingester query tags interceptors")
|
||||
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.StreamServerQueryTagsInterceptor)
|
||||
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.UnaryServerQueryTagsInterceptor)
|
||||
t.Cfg.Server.GRPCStreamMiddleware = append(
|
||||
t.Cfg.Server.GRPCStreamMiddleware,
|
||||
serverutil.StreamServerQueryTagsInterceptor,
|
||||
serverutil.StreamServerHTTPHeadersInterceptor,
|
||||
)
|
||||
|
||||
t.Cfg.Server.GRPCMiddleware = append(
|
||||
t.Cfg.Server.GRPCMiddleware,
|
||||
serverutil.UnaryServerQueryTagsInterceptor,
|
||||
serverutil.UnaryServerHTTPHeadersnIterceptor,
|
||||
)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -612,6 +612,11 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
|
||||
header.Set(httpreq.LokiActorPathHeader, actor)
|
||||
}
|
||||
|
||||
// Add disable wrappers
|
||||
if disableWrappers := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader); disableWrappers != "" {
|
||||
header.Set(httpreq.LokiDisablePipelineWrappersHeader, disableWrappers)
|
||||
}
|
||||
|
||||
// Add limits
|
||||
if limits := querylimits.ExtractQueryLimitsContext(ctx); limits != nil {
|
||||
err := querylimits.InjectQueryLimitsHeader(&header, limits)
|
||||
|
||||
@@ -272,6 +272,11 @@ func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryra
|
||||
ctx = httpreq.InjectActorPath(ctx, actor)
|
||||
}
|
||||
|
||||
// Add disable wrappers
|
||||
if disableWrappers, ok := req.Metadata[httpreq.LokiDisablePipelineWrappersHeader]; ok {
|
||||
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, disableWrappers)
|
||||
}
|
||||
|
||||
// Add limits
|
||||
if encodedLimits, ok := req.Metadata[querylimits.HTTPHeaderQueryLimitsKey]; ok {
|
||||
limits, err := querylimits.UnmarshalQueryLimits([]byte(encodedLimits))
|
||||
@@ -364,6 +369,12 @@ func (Codec) QueryRequestWrap(ctx context.Context, r queryrangebase.Request) (*Q
|
||||
result.Metadata[httpreq.LokiActorPathHeader] = actor
|
||||
}
|
||||
|
||||
// Keep disable wrappers
|
||||
disableWrappers := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader)
|
||||
if disableWrappers != "" {
|
||||
result.Metadata[httpreq.LokiDisablePipelineWrappersHeader] = disableWrappers
|
||||
}
|
||||
|
||||
// Add limits
|
||||
limits := querylimits.ExtractQueryLimitsContext(ctx)
|
||||
if limits != nil {
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
|
||||
lokilog "github.com/grafana/loki/pkg/logql/log"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
@@ -507,7 +509,7 @@ func (s *LokiStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.pipelineWrapper != nil {
|
||||
if s.pipelineWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
|
||||
userID, err := tenant.TenantID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -554,7 +556,7 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.extractorWrapper != nil {
|
||||
if s.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
|
||||
userID, err := tenant.TenantID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/grafana/dskit/flagext"
|
||||
@@ -932,6 +934,37 @@ func Test_PipelineWrapper(t *testing.T) {
|
||||
require.Equal(t, 28, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
func Test_PipelineWrapper_disabled(t *testing.T) {
|
||||
s := &LokiStore{
|
||||
Store: storeFixture,
|
||||
cfg: Config{
|
||||
MaxChunkBatchSize: 10,
|
||||
},
|
||||
chunkMetrics: NilMetrics,
|
||||
}
|
||||
wrapper := &testPipelineWrapper{
|
||||
pipeline: newMockPipeline(),
|
||||
}
|
||||
|
||||
s.SetPipelineWrapper(wrapper)
|
||||
ctx = user.InjectOrgID(context.Background(), "test-user")
|
||||
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
|
||||
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("store.SelectLogs() error = %v", err)
|
||||
return
|
||||
}
|
||||
defer logit.Close()
|
||||
for logit.Next() {
|
||||
require.NoError(t, logit.Error()) // consume the iterator
|
||||
}
|
||||
|
||||
require.Equal(t, "", wrapper.tenant)
|
||||
require.Equal(t, "", wrapper.query)
|
||||
require.Equal(t, 0, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
type testPipelineWrapper struct {
|
||||
query string
|
||||
pipeline *mockPipeline
|
||||
@@ -1017,6 +1050,36 @@ func Test_SampleWrapper(t *testing.T) {
|
||||
require.Equal(t, 28, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
func Test_SampleWrapper_disabled(t *testing.T) {
|
||||
s := &LokiStore{
|
||||
Store: storeFixture,
|
||||
cfg: Config{
|
||||
MaxChunkBatchSize: 10,
|
||||
},
|
||||
chunkMetrics: NilMetrics,
|
||||
}
|
||||
wrapper := &testExtractorWrapper{
|
||||
extractor: newMockExtractor(),
|
||||
}
|
||||
s.SetExtractorWrapper(wrapper)
|
||||
|
||||
ctx = user.InjectOrgID(context.Background(), "test-user")
|
||||
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
|
||||
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 3}}, nil)})
|
||||
if err != nil {
|
||||
t.Errorf("store.SelectSamples() error = %v", err)
|
||||
return
|
||||
}
|
||||
defer it.Close()
|
||||
for it.Next() {
|
||||
require.NoError(t, it.Error()) // consume the iterator
|
||||
}
|
||||
|
||||
require.Equal(t, "", wrapper.tenant)
|
||||
require.Equal(t, "", wrapper.query)
|
||||
require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
|
||||
}
|
||||
|
||||
type testExtractorWrapper struct {
|
||||
query string
|
||||
tenant string
|
||||
|
||||
@@ -12,7 +12,8 @@ type headerContextKey string
|
||||
|
||||
var (
|
||||
// LokiActorPathHeader is the name of the header e.g. used to enqueue requests in hierarchical queues.
|
||||
LokiActorPathHeader = "X-Loki-Actor-Path"
|
||||
LokiActorPathHeader = "X-Loki-Actor-Path"
|
||||
LokiDisablePipelineWrappersHeader = "X-Loki-Disable-Pipeline-Wrappers"
|
||||
|
||||
// LokiActorPathDelimiter is the delimiter used to serialise the hierarchy of the actor.
|
||||
LokiActorPathDelimiter = "|"
|
||||
@@ -50,3 +51,7 @@ func ExtractActorPath(ctx context.Context) []string {
|
||||
func InjectActorPath(ctx context.Context, value string) context.Context {
|
||||
return context.WithValue(ctx, headerContextKey(LokiActorPathHeader), value)
|
||||
}
|
||||
|
||||
func InjectHeader(ctx context.Context, key, value string) context.Context {
|
||||
return context.WithValue(ctx, headerContextKey(key), value)
|
||||
}
|
||||
|
||||
61
pkg/util/server/grpc_headers.go
Normal file
61
pkg/util/server/grpc_headers.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
)
|
||||
|
||||
func injectHTTPHeadersIntoGRPCRequest(ctx context.Context) context.Context {
|
||||
header := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader)
|
||||
if header == "" {
|
||||
return ctx
|
||||
}
|
||||
|
||||
// inject into GRPC metadata
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(map[string]string{})
|
||||
}
|
||||
md = md.Copy()
|
||||
md.Set(httpreq.LokiDisablePipelineWrappersHeader, header)
|
||||
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
|
||||
func extractHTTPHeadersFromGRPCRequest(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
// No metadata, just return as is
|
||||
return ctx
|
||||
}
|
||||
|
||||
headerValues := md.Get(httpreq.LokiDisablePipelineWrappersHeader)
|
||||
if len(headerValues) == 0 {
|
||||
return ctx
|
||||
}
|
||||
|
||||
return httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, headerValues[0])
|
||||
}
|
||||
|
||||
func UnaryClientHTTPHeadersInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(injectHTTPHeadersIntoGRPCRequest(ctx), method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
func StreamClientHTTPHeadersInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return streamer(injectHTTPHeadersIntoGRPCRequest(ctx), desc, cc, method, opts...)
|
||||
}
|
||||
|
||||
func UnaryServerHTTPHeadersnIterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
return handler(extractHTTPHeadersFromGRPCRequest(ctx), req)
|
||||
}
|
||||
|
||||
func StreamServerHTTPHeadersInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
return handler(srv, serverStream{
|
||||
ctx: extractHTTPHeadersFromGRPCRequest(ss.Context()),
|
||||
ServerStream: ss,
|
||||
})
|
||||
}
|
||||
81
pkg/util/server/grpc_headers_test.go
Normal file
81
pkg/util/server/grpc_headers_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/loki/pkg/util/httpreq"
|
||||
)
|
||||
|
||||
func TestInjectHTTPHeaderIntoGRPCRequest(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
name, header string
|
||||
md, expectMetadata metadata.MD
|
||||
}{
|
||||
{
|
||||
name: "creates new metadata and sets header",
|
||||
header: "true",
|
||||
expectMetadata: metadata.New(map[string]string{httpreq.LokiDisablePipelineWrappersHeader: "true"}),
|
||||
},
|
||||
{
|
||||
name: "sets header on existing metadata",
|
||||
header: "true",
|
||||
md: metadata.New(map[string]string{"x-foo": "bar"}),
|
||||
expectMetadata: metadata.New(map[string]string{"x-foo": "bar", httpreq.LokiDisablePipelineWrappersHeader: "true"}),
|
||||
},
|
||||
{
|
||||
name: "no header, leave metadata untouched",
|
||||
md: metadata.New(map[string]string{"x-foo": "bar"}),
|
||||
expectMetadata: metadata.New(map[string]string{"x-foo": "bar"}),
|
||||
},
|
||||
{
|
||||
name: "no header",
|
||||
expectMetadata: nil,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if tt.header != "" {
|
||||
ctx = httpreq.InjectHeader(context.Background(), httpreq.LokiDisablePipelineWrappersHeader, tt.header)
|
||||
}
|
||||
|
||||
if tt.md != nil {
|
||||
ctx = metadata.NewOutgoingContext(ctx, tt.md)
|
||||
}
|
||||
|
||||
ctx = injectHTTPHeadersIntoGRPCRequest(ctx)
|
||||
md, _ := metadata.FromOutgoingContext(ctx)
|
||||
require.EqualValues(t, tt.expectMetadata, md)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractHTTPHeaderFromGRPCRequest(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
md metadata.MD
|
||||
expectedResp string
|
||||
}{
|
||||
{
|
||||
name: "extracts header from metadata",
|
||||
md: metadata.New(map[string]string{httpreq.LokiDisablePipelineWrappersHeader: "true"}),
|
||||
expectedResp: "true",
|
||||
},
|
||||
{
|
||||
name: "non-nil metadata without header",
|
||||
md: metadata.New(map[string]string{"x-foo": "bar"}),
|
||||
},
|
||||
{
|
||||
name: "nil metadata",
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := metadata.NewIncomingContext(context.Background(), tt.md)
|
||||
ctx = extractHTTPHeadersFromGRPCRequest(ctx)
|
||||
require.Equal(t, tt.expectedResp, httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader))
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user