diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md
index 4b628cfff4..2c7ef2fccc 100644
--- a/docs/sources/shared/configuration.md
+++ b/docs/sources/shared/configuration.md
@@ -5027,10 +5027,11 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# CLI flag: -memberlist.cluster-label-verification-disabled
[cluster_label_verification_disabled: | default = false]
-# Other cluster members to join. Can be specified multiple times. It can be an
-# IP, hostname or an entry specified in the DNS Service Discovery format.
+# Other cluster members to join. Can be specified multiple times or as a
+# comma-separated list. It can be an IP, hostname or an entry specified in the
+# DNS Service Discovery format.
# CLI flag: -memberlist.join
-[join_members: | default = []]
+[join_members: ]
# Min backoff duration to join other cluster members.
# CLI flag: -memberlist.min-join-backoff
@@ -5050,6 +5051,12 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# CLI flag: -memberlist.abort-if-fast-join-fails
[abort_if_cluster_fast_join_fails: | default = false]
+# Minimum number of seed nodes that must be successfully joined during fast-join
+# for it to succeed. Only applies when -memberlist.abort-if-fast-join-fails is
+# enabled.
+# CLI flag: -memberlist.abort-if-fast-join-fails-min-nodes
+[abort_if_cluster_fast_join_fails_min_nodes: | default = 1]
+
# Abort if this node fails to join memberlist cluster at startup. When enabled,
# it's not guaranteed that other services are started only after the cluster
# state has been successfully updated; use 'abort-if-fast-join-fails' instead.
@@ -5065,6 +5072,13 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# CLI flag: -memberlist.rejoin-interval
[rejoin_interval: | default = 0s]
+# Seed nodes to use for periodic rejoin. Takes precedence over -memberlist.join
+# for rejoining. If not specified, -memberlist.join is used. Can be specified
+# multiple times or as a comma-separated list. Supports IP, hostname, or DNS
+# Service Discovery format.
+# CLI flag: -memberlist.rejoin-seed-nodes
+[rejoin_seed_nodes: ]
+
# How long to keep LEFT ingesters in the ring.
# CLI flag: -memberlist.left-ingesters-timeout
[left_ingesters_timeout: | default = 5m]
@@ -6406,6 +6420,12 @@ cluster_validation:
# validation check.
# CLI flag: -server.cluster-validation.http.excluded-user-agents
[excluded_user_agents: | default = ""]
+
+# Creates new traces for each call rather than continuing the existing trace. A
+# span link is used to allow navigation to the parent trace. Only works when
+# using Open-Telemetry tracing.
+# CLI flag: -server.create-new-traces
+[create_new_traces: | default = false]
```
### storage_config
diff --git a/go.mod b/go.mod
index 072a2f6d8c..715a4488b1 100644
--- a/go.mod
+++ b/go.mod
@@ -53,7 +53,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
- github.com/grafana/dskit v0.0.0-20260120145137-7f862deba99d
+ github.com/grafana/dskit v0.0.0-20260209132809-8d1c6d34bb5a
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20251127154401-74f93547077b
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853
diff --git a/go.sum b/go.sum
index 84176fe28e..afabb4fb28 100644
--- a/go.sum
+++ b/go.sum
@@ -651,8 +651,8 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
-github.com/grafana/dskit v0.0.0-20260120145137-7f862deba99d h1:V+19BJI3h0yqqQ94lmLgorGy3J+xmme8f4w3GFnYbew=
-github.com/grafana/dskit v0.0.0-20260120145137-7f862deba99d/go.mod h1:iHZFIKfU//1Uto0gmgmGBB1oVUBcTiepUFslatTkrAs=
+github.com/grafana/dskit v0.0.0-20260209132809-8d1c6d34bb5a h1:a/GywXjGvbhyra9fG7vqe8fN1xnNbCVIjWluZwus8d4=
+github.com/grafana/dskit v0.0.0-20260209132809-8d1c6d34bb5a/go.mod h1:M2NzmN1SL1duPeCTEJmttPyiAlYAxPS+G+Btza5F0Q8=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gomemcache v0.0.0-20251127154401-74f93547077b h1:5qp8/5YPt/Z2RW5QHsxvwE05+LWQYIXydP2MwOkMfb8=
diff --git a/pkg/querytee/proxy.go b/pkg/querytee/proxy.go
index 905bff87eb..0e8e306abe 100644
--- a/pkg/querytee/proxy.go
+++ b/pkg/querytee/proxy.go
@@ -439,7 +439,7 @@ func (p *Proxy) Start() error {
var handler http.Handler = router
// Configure tracing middleware to extract trace headers
// This ensures trace context is properly propagated from incoming requests
- tracer := middleware.NewTracer(nil, true, nil) // true enables trace header extraction
+ tracer := middleware.NewTracer(nil, true, nil, nil) // true enables trace header extraction
handler = tracer.Wrap(router)
level.Info(p.logger).Log("msg", "HTTP tracing middleware enabled with header extraction")
diff --git a/vendor/github.com/grafana/dskit/flagext/map.go b/vendor/github.com/grafana/dskit/flagext/map.go
index 78f95fb4d3..cdd624da09 100644
--- a/vendor/github.com/grafana/dskit/flagext/map.go
+++ b/vendor/github.com/grafana/dskit/flagext/map.go
@@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
- "gopkg.in/yaml.v3"
+ "go.yaml.in/yaml/v3"
)
// LimitsMap is a flag.Value implementation that looks like a generic map, holding float64s, ints, or strings as values.
diff --git a/vendor/github.com/grafana/dskit/flagext/stringslicecsvmulti.go b/vendor/github.com/grafana/dskit/flagext/stringslicecsvmulti.go
new file mode 100644
index 0000000000..f480c60a52
--- /dev/null
+++ b/vendor/github.com/grafana/dskit/flagext/stringslicecsvmulti.go
@@ -0,0 +1,50 @@
+package flagext
+
+import "strings"
+
+// StringSliceCSVMulti is a slice of strings that supports both:
+// - Multiple flag invocations (values are appended)
+// - Comma-separated values (split on commas)
+// It implements flag.Value
+type StringSliceCSVMulti []string
+
+// String implements flag.Value
+func (v StringSliceCSVMulti) String() string {
+ return strings.Join(v, ",")
+}
+
+// Set implements flag.Value
+func (v *StringSliceCSVMulti) Set(s string) error {
+ if len(s) == 0 {
+ return nil
+ }
+ *v = append(*v, strings.Split(s, ",")...)
+ return nil
+}
+
+// UnmarshalYAML implements yaml.Unmarshaler.
+func (v *StringSliceCSVMulti) UnmarshalYAML(unmarshal func(interface{}) error) error {
+ var s string
+ if err := unmarshal(&s); err == nil {
+ // String format: split on commas
+ if len(s) == 0 {
+ *v = nil
+ return nil
+ }
+ *v = strings.Split(s, ",")
+ return nil
+ }
+
+ // Fall back to list format
+ var slice []string
+ if err := unmarshal(&slice); err != nil {
+ return err
+ }
+ *v = slice
+ return nil
+}
+
+// MarshalYAML implements yaml.Marshaler.
+func (v StringSliceCSVMulti) MarshalYAML() (interface{}, error) {
+ return v.String(), nil
+}
diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go
index 88b69cc647..cf2ca532fe 100644
--- a/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go
+++ b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go
@@ -26,6 +26,8 @@ type StatusPageData struct {
Memberlist *memberlist.Memberlist
SortedMembers []*memberlist.Node
Store map[string]ValueDesc
+ StoreSizes map[string]int
+ TotalStoreSize int
MessageHistoryBufferBytes int
SentMessages []Message
ReceivedMessages []Message
@@ -101,11 +103,16 @@ func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
sent, received := kv.getSentAndReceivedMessages()
+ store := kv.storeCopy()
+ storeSizes, totalStoreSize := computeStoreSizes(kv, store)
+
v := StatusPageData{
Now: time.Now(),
Memberlist: kv.memberlist,
SortedMembers: members,
- Store: kv.storeCopy(),
+ Store: store,
+ StoreSizes: storeSizes,
+ TotalStoreSize: totalStoreSize,
MessageHistoryBufferBytes: kv.cfg.MessageHistoryBufferBytes,
SentMessages: sent,
ReceivedMessages: received,
@@ -128,6 +135,27 @@ func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
+func computeStoreSizes(kv *KV, store map[string]ValueDesc) (sizes map[string]int, total int) {
+ sizes = make(map[string]int, len(store))
+ for key, val := range store {
+ if val.value == nil {
+ continue
+ }
+ c := kv.GetCodec(val.CodecID)
+ if c == nil {
+ continue
+ }
+ encoded, err := c.Encode(val.value)
+ if err != nil {
+ continue
+ }
+ size := len(encoded)
+ sizes[key] = size
+ total += size
+ }
+ return
+}
+
func getFormat(req *http.Request) string {
const viewFormat = "format"
diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
index 7ebb643738..ba8964adbc 100644
--- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
+++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
@@ -25,6 +25,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/services"
+ "github.com/grafana/dskit/timeutil"
)
const (
@@ -154,13 +155,15 @@ type KVConfig struct {
ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled" category:"advanced"`
// List of members to join
- JoinMembers flagext.StringSlice `yaml:"join_members"`
- MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"`
- MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"`
- MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"`
- AbortIfFastJoinFails bool `yaml:"abort_if_cluster_fast_join_fails" category:"advanced"`
- AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"`
- RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"`
+ JoinMembers flagext.StringSliceCSVMulti `yaml:"join_members"`
+ MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"`
+ MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"`
+ MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"`
+ AbortIfFastJoinFails bool `yaml:"abort_if_cluster_fast_join_fails" category:"advanced"`
+ AbortIfFastJoinFailsMinNodes int `yaml:"abort_if_cluster_fast_join_fails_min_nodes" category:"advanced"`
+ AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"`
+ RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"`
+ RejoinSeedNodes flagext.StringSliceCSVMulti `yaml:"rejoin_seed_nodes" category:"experimental"`
// Remove LEFT ingesters from ring after this timeout.
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"`
@@ -203,13 +206,15 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.")
f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", 2*time.Second, "The timeout for establishing a connection with a remote node, and for read/write operations.")
f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", mlDefaults.RetransmitMult, "Multiplication factor used when sending out messages (factor * log(N+1)).")
- f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. It can be an IP, hostname or an entry specified in the DNS Service Discovery format.")
+ f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times or as a comma-separated list. It can be an IP, hostname or an entry specified in the DNS Service Discovery format.")
f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.")
f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.")
f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.")
f.BoolVar(&cfg.AbortIfFastJoinFails, prefix+"memberlist.abort-if-fast-join-fails", false, "Abort if this node fails the fast memberlist cluster joining procedure at startup. When enabled, it's guaranteed that other services, depending on memberlist, have an updated view over the cluster state when they're started.")
+ f.IntVar(&cfg.AbortIfFastJoinFailsMinNodes, prefix+"memberlist.abort-if-fast-join-fails-min-nodes", 1, "Minimum number of seed nodes that must be successfully joined during fast-join for it to succeed. Only applies when -memberlist.abort-if-fast-join-fails is enabled.")
f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", cfg.AbortIfJoinFails, "Abort if this node fails to join memberlist cluster at startup. When enabled, it's not guaranteed that other services are started only after the cluster state has been successfully updated; use 'abort-if-fast-join-fails' instead.")
f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.")
+ f.Var(&cfg.RejoinSeedNodes, prefix+"memberlist.rejoin-seed-nodes", "Seed nodes to use for periodic rejoin. Takes precedence over -memberlist.join for rejoining. If not specified, -memberlist.join is used. Can be specified multiple times or as a comma-separated list. Supports IP, hostname, or DNS Service Discovery format.")
f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.")
f.DurationVar(&cfg.ObsoleteEntriesTimeout, prefix+"memberlist.obsolete-entries-timeout", mlDefaults.PushPullInterval, "How long to keep obsolete entries in the KV store.")
f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 20*time.Second, "Timeout for leaving memberlist cluster.")
@@ -247,6 +252,15 @@ func (cfg *KVConfig) Validate() error {
return cfg.ZoneAwareRouting.Validate()
}
+// GetRejoinSeedNodes returns the seed nodes to use for periodic rejoin.
+// If RejoinSeedNodes is set, it returns that. Otherwise, it falls back to JoinMembers.
+func (cfg *KVConfig) GetRejoinSeedNodes() []string {
+ if len(cfg.RejoinSeedNodes) > 0 {
+ return cfg.RejoinSeedNodes
+ }
+ return cfg.JoinMembers
+}
+
func generateRandomSuffix(logger log.Logger) string {
suffix := make([]byte, 4)
_, err := crypto_rand.Read(suffix)
@@ -612,11 +626,13 @@ func (m *KV) running(ctx context.Context) error {
}
var tickerChan <-chan time.Time
- if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 {
- t := time.NewTicker(m.cfg.RejoinInterval)
- defer t.Stop()
-
- tickerChan = t.C
+ if m.cfg.RejoinInterval > 0 && len(m.cfg.GetRejoinSeedNodes()) > 0 {
+ // Use a random initial delay between 0 and RejoinInterval to uniformly
+ // distribute rejoins across time when multiple processes start simultaneously.
+ initialDelay := 1 + time.Duration(math_rand.Int63n(int64(m.cfg.RejoinInterval)))
+ var stop func()
+ stop, tickerChan = timeutil.NewVariableTicker(initialDelay, m.cfg.RejoinInterval)
+ defer stop()
}
var obsoleteEntriesTickerChan <-chan time.Time
@@ -632,7 +648,7 @@ func (m *KV) running(ctx context.Context) error {
select {
case <-tickerChan:
const numAttempts = 1 // don't retry if resolution fails, we will try again next time
- reached, err := m.joinMembersWithRetries(ctx, numAttempts, logger)
+ reached, err := m.joinMembersWithRetries(ctx, m.cfg.GetRejoinSeedNodes(), numAttempts, logger)
if err == nil {
level.Info(logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached)
} else {
@@ -704,9 +720,9 @@ func (m *KV) fastJoinMembersOnStartup(ctx context.Context) error {
nodes = nodes[1:]
}
- if totalJoined == 0 {
- level.Warn(m.logger).Log("msg", "memberlist fast-join failed because no node has been successfully reached", "elapsed_time", time.Since(startTime))
- return fmt.Errorf("no memberlist node reached during fast-join procedure")
+ if totalJoined < m.cfg.AbortIfFastJoinFailsMinNodes {
+ level.Warn(m.logger).Log("msg", "memberlist fast-join failed to reach minimum required seed nodes", "joined_nodes", totalJoined, "required_nodes", m.cfg.AbortIfFastJoinFailsMinNodes, "elapsed_time", time.Since(startTime))
+ return fmt.Errorf("fast-join failed to reach minimum required seed nodes: joined %d, required %d", totalJoined, m.cfg.AbortIfFastJoinFailsMinNodes)
}
level.Info(m.logger).Log("msg", "memberlist fast-join finished", "joined_nodes", totalJoined, "elapsed_time", time.Since(startTime))
@@ -729,7 +745,7 @@ func (m *KV) joinMembersOnStartup(ctx context.Context) bool {
logger := log.With(m.logger, "phase", "startup")
level.Info(logger).Log("msg", "joining memberlist cluster", "join_members", strings.Join(m.cfg.JoinMembers, ","))
startTime := time.Now()
- reached, err := m.joinMembersWithRetries(ctx, m.cfg.MaxJoinRetries, logger)
+ reached, err := m.joinMembersWithRetries(ctx, m.cfg.JoinMembers, m.cfg.MaxJoinRetries, logger)
if err != nil {
level.Error(logger).Log("msg", "joining memberlist cluster failed", "err", err, "elapsed_time", time.Since(startTime))
return false
@@ -738,10 +754,10 @@ func (m *KV) joinMembersOnStartup(ctx context.Context) bool {
return true
}
-// joinMembersWithRetries joins m.cfg.JoinMembers 100 at a time. After each batch of 100 it rediscoveres the members.
+// joinMembersWithRetries joins the given members 100 at a time. After each batch of 100 it rediscovers the members.
// This helps when the list of members is big and by the time we reach the end the originally resolved addresses may be obsolete.
// joinMembersWithRetries returns an error iff it couldn't successfully join any node OR the context was cancelled.
-func (m *KV) joinMembersWithRetries(ctx context.Context, numAttempts int, logger log.Logger) (int, error) {
+func (m *KV) joinMembersWithRetries(ctx context.Context, members []string, numAttempts int, logger log.Logger) (int, error) {
var (
cfg = backoff.Config{
MinBackoff: m.cfg.MinJoinBackoff,
@@ -754,7 +770,7 @@ func (m *KV) joinMembersWithRetries(ctx context.Context, numAttempts int, logger
)
for ; boff.Ongoing(); boff.Wait() {
- successfullyJoined, err = m.joinMembersInBatches(ctx)
+ successfullyJoined, err = m.joinMembersInBatches(ctx, members)
if successfullyJoined > 0 {
// If there are _some_ successful joins, then we can consider the join done.
// Mimicking the Join semantics we return an error only when we couldn't join any node at all
@@ -770,10 +786,10 @@ func (m *KV) joinMembersWithRetries(ctx context.Context, numAttempts int, logger
return successfullyJoined, err
}
-// joinMembersInBatches joins m.cfg.JoinMembers and re-resolves the address of m.cfg.JoinMembers after joining 100 nodes.
+// joinMembersInBatches joins the given members and re-resolves their addresses after joining 100 nodes.
// joinMembersInBatches returns the number of nodes joined. joinMembersInBatches returns an error only when the
// number of joined nodes is 0.
-func (m *KV) joinMembersInBatches(ctx context.Context) (int, error) {
+func (m *KV) joinMembersInBatches(ctx context.Context, members []string) (int, error) {
const batchSize = 100
var (
attemptedNodes = make(map[string]bool)
@@ -789,7 +805,7 @@ func (m *KV) joinMembersInBatches(ctx context.Context) (int, error) {
//
// Ignores any DNS resolution error because it's not really actionable in this
// context.
- newlyResolved, _ := m.discoverMembersWithRetries(ctx, m.cfg.JoinMembers)
+ newlyResolved, _ := m.discoverMembersWithRetries(ctx, members)
if len(newlyResolved) > 0 {
// If the resolution fails we keep using the nodes list from the last resolution.
// If that failed too, then we fail the join attempt.
diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml
index 1b820ac1f1..fbd6c974f7 100644
--- a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml
+++ b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml
@@ -25,6 +25,7 @@
| Version |
Deleted |
Update Time |
+ Size |
Actions |
@@ -37,6 +38,7 @@
{{ $v.Version }} |
{{ $v.Deleted }} |
{{ $v.UpdateTime }} |
+ {{ index $.StoreSizes $k }} |
json
| json-pretty
@@ -46,10 +48,18 @@
{{ end }}
+
+ |
+
+ | Total: |
+ {{ .TotalStoreSize }} |
+ |
+
+
Note that value "version" is node-specific. It starts with 0 (on restart), and increases on each received update.
- Size is in bytes.
+ "Size" shows the encoded size in bytes.
Memberlist Cluster Members
diff --git a/vendor/github.com/grafana/dskit/middleware/http_tracing.go b/vendor/github.com/grafana/dskit/middleware/http_tracing.go
index f11ce05952..24397d5a5e 100644
--- a/vendor/github.com/grafana/dskit/middleware/http_tracing.go
+++ b/vendor/github.com/grafana/dskit/middleware/http_tracing.go
@@ -16,7 +16,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
- semconv "go.opentelemetry.io/otel/semconv/v1.37.0" // otelhttp uses semconv v1.37.0 so we stick to the same version in order to produce consistent attributes on HTTP and HTTPGRPC spans.
+ semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
@@ -36,11 +36,12 @@ type Tracer struct {
traceHeaders bool
httpHeadersToExclude map[string]bool
+ publicEndpointFn func(*http.Request) bool
}
// NewTracer creates a new tracer optionally configuring the tracing of HTTP headers.
// The configuration for HTTP headers tracing only applies to OpenTelemetry spans.
-func NewTracer(sourceIPs *SourceIPExtractor, traceHeaders bool, excludeHeaders []string) Tracer {
+func NewTracer(sourceIPs *SourceIPExtractor, traceHeaders bool, excludeHeaders []string, publicEndpointFn func(*http.Request) bool) Tracer {
httpHeadersToExclude := map[string]bool{}
for header := range AlwaysExcludedHeaders {
httpHeadersToExclude[header] = true
@@ -54,6 +55,7 @@ func NewTracer(sourceIPs *SourceIPExtractor, traceHeaders bool, excludeHeaders [
traceHeaders: traceHeaders,
httpHeadersToExclude: httpHeadersToExclude,
+ publicEndpointFn: publicEndpointFn,
}
}
@@ -152,9 +154,15 @@ func (t Tracer) wrapWithOTel(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
})
- return otelhttp.NewHandler(addSpanAttributes, "http.tracing", otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
- return httpOperationName(r)
- }))
+ opts := []otelhttp.Option{
+ otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
+ return httpOperationName(r)
+ }),
+ }
+ if t.publicEndpointFn != nil {
+ opts = append(opts, otelhttp.WithPublicEndpointFn(t.publicEndpointFn))
+ }
+ return otelhttp.NewHandler(addSpanAttributes, "http.tracing", opts...)
}
// HTTPGRPCTracingInterceptor adds additional information about the encapsulated HTTP request
diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go
index 1ebe6ca282..27fab08d65 100644
--- a/vendor/github.com/grafana/dskit/ring/lifecycler.go
+++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go
@@ -623,7 +623,8 @@ func (i *Lifecycler) stopping(runningError error) error {
if runningError != nil {
// previously lifecycler just called os.Exit (from loop method)...
// now it stops more gracefully, but also without doing any cleanup
- return nil
+ level.Error(i.logger).Log("msg", "lifecycler loop() exited with error", "err", runningError)
+ return runningError
}
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go
index b41f3c5135..b4d740b373 100644
--- a/vendor/github.com/grafana/dskit/ring/ring.go
+++ b/vendor/github.com/grafana/dskit/ring/ring.go
@@ -559,19 +559,46 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
maxZones = r.cfg.ReplicationFactor
maxInstances = len(r.ringDesc.Ingesters)
- // We use a slice instead of a map because it's faster to search within a
- // slice than lookup a map for a very low number of items, we only expect
- // to have low single-digit number of hosts.
- distinctHosts = bufHosts[:0]
+ // distinctHosts tracks which instances we've already examined.
+ distinctHosts = newStringSet(bufHosts)
- examinedHostsPerZone = make(map[string]int)
- foundHostsPerZone = make(map[string]int)
+ // Fixed-size buffers for zone tracking slices. Using arrays with a fixed size allows
+ // the compiler to allocate them on the stack, avoiding heap allocations. The size of 5
+ // is chosen as a reasonable upper bound for zones (most deployments use 3).
+ totalHostsPerZoneBuf [5]int
+ examinedHostsPerZoneBuf [5]int
+ foundHostsPerZoneBuf [5]int
+
+ // These slices are indexed by the zone index, that is the index of a zone in r.ringZones.
+ // We use this technique – instead of a map – to optimize the lookup of the number of hosts by zones.
+ totalHostsPerZone []int
+ examinedHostsPerZone []int
+ foundHostsPerZone []int
targetHostsPerZone = max(1, replicationFactor/maxZones)
)
- for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ {
+ if r.cfg.ZoneAwarenessEnabled {
+ // Initialize the per-zone hosts counters only if zone-awareness is enabled.
+ // If zone-awareness is disabled and these slices get used by mistake, the code will intentionally panic.
+ if numZones := len(r.ringZones); numZones <= len(totalHostsPerZoneBuf) {
+ totalHostsPerZone = totalHostsPerZoneBuf[:numZones]
+ examinedHostsPerZone = examinedHostsPerZoneBuf[:numZones]
+ foundHostsPerZone = foundHostsPerZoneBuf[:numZones]
+ } else {
+ totalHostsPerZone = make([]int, numZones)
+ examinedHostsPerZone = make([]int, numZones)
+ foundHostsPerZone = make([]int, numZones)
+ }
+
+ // Pre-populate the total number of hosts per zone.
+ for zoneIndex, zone := range r.ringZones {
+ totalHostsPerZone[zoneIndex] = r.instancesCountPerZone[zone]
+ }
+ }
+
+ for i := start; distinctHosts.len() < min(maxInstances, n) && iterations < len(r.ringTokens); i++ {
// If we have the target number of instances or have looked at all instances in each zone, stop looking
- if r.cfg.ZoneAwarenessEnabled && r.canStopLooking(foundHostsPerZone, examinedHostsPerZone, targetHostsPerZone) {
+ if r.cfg.ZoneAwarenessEnabled && r.canStopLooking(totalHostsPerZone, foundHostsPerZone, examinedHostsPerZone, targetHostsPerZone) {
break
}
@@ -587,23 +614,34 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
}
// We want n *distinct* instances && distinct zones.
- if slices.Contains(distinctHosts, info.InstanceID) {
+ if distinctHosts.contains(info.InstanceID) {
continue
}
+ // Look for the zone index. We only need it if zone-awareness is enabled. If zone-awareness
+ // is disabled, we intentionally use a negative value so that if the index is used unintentionally,
+ // the code will panic.
+ zoneIndex := -1
+ if r.cfg.ZoneAwarenessEnabled {
+ zoneIndex = slices.Index(r.ringZones, info.Zone)
+ if zoneIndex == -1 {
+ return nil, errors.Wrapf(ErrInconsistentTokensInfo, "the zone %q is not present in the ring zones %v", info.Zone, r.ringZones)
+ }
+ }
+
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// If we already have the required number of instances for this zone, skip.
- if foundHostsPerZone[info.Zone] >= targetHostsPerZone {
+ if foundHostsPerZone[zoneIndex] >= targetHostsPerZone {
continue
}
// Keep track of the number of hosts we have examined in each zone. Once we've looked
// at every host in a zone, we can stop looking at each token: we won't find any more
// hosts to add to the replication set.
- examinedHostsPerZone[info.Zone]++
+ examinedHostsPerZone[zoneIndex]++
}
- distinctHosts = append(distinctHosts, info.InstanceID)
+ distinctHosts.add(info.InstanceID)
instance := r.ringDesc.Ingesters[info.InstanceID]
// Check whether the replica set should be extended given we're including
@@ -613,7 +651,7 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// We should only increment the count for this zone if we are not going to
// extend, as we want to extend the instance in the same AZ.
- foundHostsPerZone[info.Zone]++
+ foundHostsPerZone[zoneIndex]++
}
include, keepGoing := true, true
@@ -633,9 +671,12 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
// canStopLooking returns true if we have enough hosts for the replication factor
// or if we have looked at all hosts, for all zones. This method assumes that the
// lock for ring state is held.
-func (r *Ring) canStopLooking(foundPerZone map[string]int, examinedPerZone map[string]int, targetPerZone int) bool {
- for zone, total := range r.instancesCountPerZone {
- zoneOk := foundPerZone[zone] >= targetPerZone || examinedPerZone[zone] >= total
+//
+// All input slices must have consistent zone indexes. It means that the index 0
+// of each slice must correspond to the same zone, and the same for all other indexes.
+func (r *Ring) canStopLooking(totalHostsPerZone []int, foundHostsPerZone []int, examinedHostsPerZone []int, targetPerZone int) bool {
+ for zoneIndex, total := range totalHostsPerZone {
+ zoneOk := foundHostsPerZone[zoneIndex] >= targetPerZone || examinedHostsPerZone[zoneIndex] >= total
if !zoneOk {
return false
}
diff --git a/vendor/github.com/grafana/dskit/ring/util.go b/vendor/github.com/grafana/dskit/ring/util.go
index 910860cc80..baa4c2188e 100644
--- a/vendor/github.com/grafana/dskit/ring/util.go
+++ b/vendor/github.com/grafana/dskit/ring/util.go
@@ -148,3 +148,60 @@ func tokenDistance(from, to uint32) int64 {
// the trailing +1 is needed to ensure that token 0 is counted
return math.MaxUint32 - int64(from) + int64(to) + 1
}
+
+// stringSet is a set of strings optimized for both small and large sizes.
+// It uses a slice for small sets (zero allocations when using a pre-allocated buffer)
+// and automatically switches to a map when the slice is full.
+//
+// The rationale is that looking for a string in a small slice is faster than in a map: the
+// map implementation becomes faster – compared to a slice – the more the slice
+// gets bigger, so we switch to a map only when the slice is full.
+type stringSet struct {
+ setSlice []string
+ setMap map[string]struct{}
+ count int
+}
+
+func newStringSet(sliceBuf []string) *stringSet {
+ return &stringSet{
+ setSlice: sliceBuf[:0],
+ setMap: nil,
+ }
+}
+
+// contains returns true if the set contains the given string.
+func (s *stringSet) contains(str string) bool {
+ if s.setMap != nil {
+ _, ok := s.setMap[str]
+ return ok
+ }
+ return slices.Contains(s.setSlice, str)
+}
+
+// add adds a string to the set. This function does NOT check if the string is already present:
+// the caller MUST ensure that by calling contains() before add().
+func (s *stringSet) add(str string) {
+ s.count++
+
+ if s.setMap != nil {
+ s.setMap[str] = struct{}{}
+ } else if len(s.setSlice) < cap(s.setSlice) {
+ s.setSlice = append(s.setSlice, str)
+ } else {
+ // Slice is full, switch to map.
+ s.setMap = make(map[string]struct{}, len(s.setSlice)*2)
+
+ // Copy the data from the slice to the map.
+ for _, h := range s.setSlice {
+ s.setMap[h] = struct{}{}
+ }
+
+ // Add the new string to the map.
+ s.setMap[str] = struct{}{}
+ }
+}
+
+// len returns the number of strings in the set.
+func (s *stringSet) len() int {
+ return s.count
+}
diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go
index 05a9f2fcd8..3414e206f9 100644
--- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go
+++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go
@@ -19,7 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
- "gopkg.in/yaml.v3"
+ "go.yaml.in/yaml/v3"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go
index ca2d5562f8..a1499add98 100644
--- a/vendor/github.com/grafana/dskit/server/server.go
+++ b/vendor/github.com/grafana/dskit/server/server.go
@@ -172,6 +172,13 @@ type Config struct {
Throughput Throughput `yaml:"-"`
ClusterValidation clusterutil.ServerClusterValidationConfig `yaml:"cluster_validation" category:"experimental"`
+
+ // PublicEndpointFn will create a new trace instead of continuing an
+ // existing trace when the function returns true. A span link will be used
+ // to connect to any existing trace. It only works if using Open-Telemetry
+ // tracing.
+ PublicEndpointFn func(*http.Request) bool `yaml:"-"`
+ CreateNewTraces bool `yaml:"create_new_traces"`
}
type Throughput struct {
@@ -241,6 +248,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.Throughput.LatencyCutoff, "server.throughput.latency-cutoff", 0, "Requests taking over the cutoff are be observed to measure throughput. Server-Timing header is used with specified unit as the indicator, for example 'Server-Timing: unit;val=8.2'. If set to 0, the throughput is not calculated.")
f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "samples_processed", "Unit of the server throughput metric, for example 'processed_bytes' or 'samples_processed'. Observed values are gathered from the 'Server-Timing' header with the 'val' key. If set, it is appended to the request_server_throughput metric name.")
cfg.ClusterValidation.RegisterFlagsWithPrefix("server.cluster-validation.", f)
+ f.BoolVar(&cfg.CreateNewTraces, "server.create-new-traces", false, "Creates new traces for each call rather than continuing the existing trace. A span link is used to allow navigation to the parent trace. Only works when using Open-Telemetry tracing.")
}
func (cfg *Config) Validate() error {
@@ -589,11 +597,17 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge
defaultLogMiddleware := middleware.NewLogMiddleware(logger, cfg.LogRequestHeaders, cfg.LogRequestAtInfoLevel, logSourceIPs, strings.Split(cfg.LogRequestExcludeHeadersList, ","))
defaultLogMiddleware.DisableRequestSuccessLog = cfg.DisableRequestSuccessLog
+ publicEndpointFn := cfg.PublicEndpointFn
+ if publicEndpointFn == nil && cfg.CreateNewTraces {
+ publicEndpointFn = func(*http.Request) bool {
+ return true
+ }
+ }
defaultHTTPMiddleware := []middleware.Interface{
middleware.RouteInjector{
RouteMatcher: router,
},
- middleware.NewTracer(sourceIPs, cfg.TraceRequestHeaders, strings.Split(cfg.TraceRequestExcludeHeadersList, ",")),
+ middleware.NewTracer(sourceIPs, cfg.TraceRequestHeaders, strings.Split(cfg.TraceRequestExcludeHeadersList, ","), publicEndpointFn),
defaultLogMiddleware,
middleware.Instrument{
Duration: metrics.RequestDuration,
diff --git a/vendor/github.com/grafana/dskit/tenant/resolver.go b/vendor/github.com/grafana/dskit/tenant/resolver.go
index 9a01d6322c..dda44f0dfe 100644
--- a/vendor/github.com/grafana/dskit/tenant/resolver.go
+++ b/vendor/github.com/grafana/dskit/tenant/resolver.go
@@ -12,37 +12,46 @@ import (
// tenant ID. It returns an error user.ErrNoOrgID if there is no tenant ID
// supplied or user.ErrTooManyOrgIDs if there are multiple tenant IDs present.
//
+// If the orgID contains a subtenant (format "tenantID:subtenantID"), this function
+// strips the subtenant part and returns only the tenant ID. This ensures backward
+// compatibility with existing code that is not subtenant-aware.
+//
+// The SubtenantID is not validated.
+//
// ignore stutter warning
//
//nolint:revive
func TenantID(ctx context.Context) (string, error) {
//lint:ignore faillint wrapper around upstream method
- orgID, err := user.ExtractOrgID(ctx)
+ orgIDs, err := user.ExtractOrgID(ctx)
if err != nil {
return "", err
}
- if !strings.Contains(orgID, tenantIDsSeparator) {
- if err := ValidTenantID(orgID); err != nil {
- return "", err
+ orgID, remaining, hasMoreIDs := stringsCut(orgIDs, tenantIDsSeparator)
+ tenantID := trimSubtenantID(orgID)
+ if err := ValidTenantID(tenantID); err != nil {
+ return "", err
+ }
+ for hasMoreIDs {
+ orgID, remaining, hasMoreIDs = stringsCut(remaining, tenantIDsSeparator)
+ if tenantID != trimSubtenantID(orgID) {
+ return "", user.ErrTooManyOrgIDs
}
- return orgID, nil
}
- orgIDs, err := tenantIDsFromString(orgID)
- if err != nil {
- return "", err
- }
-
- if len(orgIDs) > 1 {
- return "", user.ErrTooManyOrgIDs
- }
-
- return orgIDs[0], nil
+ return tenantID, nil
}
// TenantIDs returns all tenant IDs from the context. It should return
// normalized list of ordered and distinct tenant IDs (as produced by
// NormalizeTenantIDs).
//
+// If the orgID contains subtenants (format "tenantID:subtenantID" or
+// "tenant1:sub1|tenant2:sub2"), this function strips the subtenant parts
+// and returns only the tenant IDs. This ensures backward compatibility
+// with existing code that is not subtenant-aware.
+//
+// SubtenantIDs are not validated.
+//
// ignore stutter warning
//
//nolint:revive
@@ -52,19 +61,84 @@ func TenantIDs(ctx context.Context) ([]string, error) {
if err != nil {
return nil, err
}
-
- return tenantIDsFromString(orgID)
+ return parseTenantIDs(orgID)
}
-func tenantIDsFromString(orgID string) ([]string, error) {
- orgIDs := strings.Split(orgID, tenantIDsSeparator)
- for _, id := range orgIDs {
- if err := ValidTenantID(id); err != nil {
+func parseTenantIDs(orgID string) ([]string, error) {
+ orgIDs := strings.Split(orgID, string(tenantIDsSeparator))
+ for i, part := range orgIDs {
+ tenantId := trimSubtenantID(part)
+ if err := ValidTenantID(tenantId); err != nil {
return nil, err
}
+ orgIDs[i] = tenantId
+ }
+ return NormalizeTenantIDs(orgIDs), nil
+}
+
+// SubtenantID returns the subtenant ID from the context, or an empty string if
+// no subtenant is present. The orgID format is "tenantID:subtenantID" (e.g., "123456:k6").
+//
+//nolint:revive
+func SubtenantID(ctx context.Context) (tenantID string, subtenantID string, _ error) {
+ //lint:ignore faillint wrapper around upstream method
+ orgIDs, err := user.ExtractOrgID(ctx)
+ if err != nil {
+ return "", "", err
}
- return NormalizeTenantIDs(orgIDs), nil
+ orgID, remaining, hasMoreIDs := stringsCut(orgIDs, tenantIDsSeparator)
+ tenantID, subtenantID = splitTenantAndSubtenant(orgID)
+ if err := ValidTenantID(tenantID); err != nil {
+ return "", "", err
+ }
+ if err := ValidSubtenantID(subtenantID); err != nil {
+ return "", "", err
+ }
+ var nextOrgID string
+ for hasMoreIDs {
+ nextOrgID, remaining, hasMoreIDs = stringsCut(remaining, tenantIDsSeparator)
+ // We can compare the entire orgID, no need to split into tenant/subtenant.
+ // The orgID is already guaranteed to be valid.
+ if orgID != nextOrgID {
+ return "", "", user.ErrTooManyOrgIDs
+ }
+ }
+ return tenantID, subtenantID, nil
+}
+
+// SubtenantIDs returns a normalized list of all subtenant IDs from the context
+// (as produced by NormalizeTenantIDs). Empty subtenants are omitted from the
+// result.
+//
+// ignore stutter warning
+//
+//nolint:revive
+func SubtenantIDs(ctx context.Context) ([]string, error) {
+ //lint:ignore faillint wrapper around upstream method
+ orgID, err := user.ExtractOrgID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return parseSubtenantIDs(orgID)
+}
+
+func parseSubtenantIDs(orgID string) ([]string, error) {
+ parts := strings.Split(orgID, string(tenantIDsSeparator))
+ var subtenantIDs []string
+ for _, part := range parts {
+ tenantID, subtenantID := splitTenantAndSubtenant(part)
+ if err := ValidTenantID(tenantID); err != nil {
+ return nil, err
+ }
+ if subtenantID != "" {
+ if err := ValidSubtenantID(subtenantID); err != nil {
+ return nil, err
+ }
+ subtenantIDs = append(subtenantIDs, subtenantID)
+ }
+ }
+ return NormalizeTenantIDs(subtenantIDs), nil
}
type Resolver interface {
@@ -78,10 +152,21 @@ type Resolver interface {
// normalized list of ordered and distinct tenant IDs (as produced by
// NormalizeTenantIDs).
TenantIDs(context.Context) ([]string, error)
+
+ // SubtenantID returns the tenant ID and subtenant ID from the context.
+ // Returns empty subtenant if no subtenant is present.
+ // The orgID format is "tenantID:subtenantID".
+ SubtenantID(context.Context) (string, string, error)
+
+ // SubtenantIDs returns all subtenant IDs from the context. It should return
+ // a normalized list of ordered and distinct subtenant IDs.
+ SubtenantIDs(context.Context) ([]string, error)
}
type MultiResolver struct{}
+var _ Resolver = NewMultiResolver()
+
// NewMultiResolver creates a tenant resolver, which allows request to have
// multiple tenant ids submitted separated by a '|' character. This enforces
// further limits on the character set allowed within tenants as detailed here:
@@ -97,3 +182,11 @@ func (t *MultiResolver) TenantID(ctx context.Context) (string, error) {
func (t *MultiResolver) TenantIDs(ctx context.Context) ([]string, error) {
return TenantIDs(ctx)
}
+
+func (t *MultiResolver) SubtenantID(ctx context.Context) (string, string, error) {
+ return SubtenantID(ctx)
+}
+
+func (t *MultiResolver) SubtenantIDs(ctx context.Context) ([]string, error) {
+ return SubtenantIDs(ctx)
+}
diff --git a/vendor/github.com/grafana/dskit/tenant/tenant.go b/vendor/github.com/grafana/dskit/tenant/tenant.go
index 4a89b57225..d5f10c4080 100644
--- a/vendor/github.com/grafana/dskit/tenant/tenant.go
+++ b/vendor/github.com/grafana/dskit/tenant/tenant.go
@@ -15,14 +15,40 @@ const (
// MaxTenantIDLength is the max length of single tenant ID in bytes
MaxTenantIDLength = 150
- tenantIDsSeparator = "|"
+ tenantIDsSeparator = '|'
+
+ // subtenantIDSeparator separates the tenant ID from the subtenant ID.
+ // The format is "tenantID:subtenantID" (e.g., "123456:k6").
+ // The colon is not a valid character in tenant IDs, making it safe to use as a separator.
+ subtenantIDSeparator = ':'
)
var (
+ // validTenantIdChars is a lookup table for valid tenant ID characters.
+ validTenantIdChars [256]bool
+
errTenantIDTooLong = fmt.Errorf("tenant ID is too long: max %d characters", MaxTenantIDLength)
errUnsafeTenantID = errors.New("tenant ID is '.' or '..'")
)
+func init() {
+ // letters
+ for c := 'a'; c <= 'z'; c++ {
+ validTenantIdChars[c] = true
+ }
+ for c := 'A'; c <= 'Z'; c++ {
+ validTenantIdChars[c] = true
+ }
+ // digits
+ for c := '0'; c <= '9'; c++ {
+ validTenantIdChars[c] = true
+ }
+ // special characters: ! - _ . * ' ( )
+ for _, c := range "!-_.*'()" {
+ validTenantIdChars[c] = true
+ }
+}
+
type errTenantIDUnsupportedCharacter struct {
pos int
tenantID string
@@ -56,32 +82,31 @@ func NormalizeTenantIDs(tenantIDs []string) []string {
return tenantIDs[0:posOut]
}
-// ValidTenantID returns an error if the single tenant ID is invalid, nil otherwise
+// ValidTenantID returns an error if the tenant ID is invalid, nil otherwise.
func ValidTenantID(s string) error {
- // check if it contains invalid runes
- for pos, r := range s {
- if !isSupported(r) {
- return &errTenantIDUnsupportedCharacter{
- tenantID: s,
- pos: pos,
- }
+ for i := 0; i < len(s); i++ {
+ if !validTenantIdChars[s[i]] {
+ return &errTenantIDUnsupportedCharacter{tenantID: s, pos: i}
}
}
-
if len(s) > MaxTenantIDLength {
return errTenantIDTooLong
}
-
- if containsUnsafePathSegments(s) {
+ if s == "." || s == ".." {
return errUnsafeTenantID
}
-
return nil
}
+// ValidSubtenantID returns an error if the subtenant ID is invalid, nil otherwise.
+// Subtenant IDs follow the same rules as tenant IDs.
+func ValidSubtenantID(s string) error {
+ return ValidTenantID(s)
+}
+
// JoinTenantIDs returns all tenant IDs concatenated with the separator character `|`
func JoinTenantIDs(tenantIDs []string) string {
- return strings.Join(tenantIDs, tenantIDsSeparator)
+ return strings.Join(tenantIDs, string(tenantIDsSeparator))
}
// ExtractTenantIDFromHTTPRequest extracts a single tenant ID directly from a HTTP request.
@@ -109,32 +134,30 @@ func TenantIDsFromOrgID(orgID string) ([]string, error) {
return TenantIDs(user.InjectOrgID(context.TODO(), orgID))
}
-// this checks if a rune is supported in tenant IDs (according to
-// https://grafana.com/docs/mimir/latest/configure/about-tenant-ids/
-func isSupported(c rune) bool {
- // characters
- if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') {
- return true
+func trimSubtenantID(orgID string) string {
+ idx := strings.IndexByte(orgID, subtenantIDSeparator)
+ if idx == -1 {
+ return orgID
}
-
- // digits
- if '0' <= c && c <= '9' {
- return true
- }
-
- // special
- return c == '!' ||
- c == '-' ||
- c == '_' ||
- c == '.' ||
- c == '*' ||
- c == '\'' ||
- c == '(' ||
- c == ')'
+ return orgID[:idx]
}
-// containsUnsafePathSegments will return true if the string is a directory
-// reference like `.` and `..`
-func containsUnsafePathSegments(id string) bool {
- return id == "." || id == ".."
+// splitTenantAndSubtenant splits an orgID into tenant ID and subtenant ID.
+// If the orgID contains no subtenant separator, the subtenant will be empty.
+// The format is "tenantID:subtenantID" (e.g., "123456:k6").
+func splitTenantAndSubtenant(orgID string) (tenantID, subtenantID string) {
+ idx := strings.IndexByte(orgID, subtenantIDSeparator)
+ if idx == -1 {
+ return orgID, ""
+ }
+ return orgID[:idx], orgID[idx+1:]
+}
+
+// stringsCut is like strings.Cut but uses strings.IndexByte instead.
+func stringsCut(s string, sep byte) (string, string, bool) {
+ idx := strings.IndexByte(s, sep)
+ if idx == -1 {
+ return s, "", false
+ }
+ return s[:idx], s[idx+1:], true
}
diff --git a/vendor/github.com/grafana/dskit/timeutil/variable_ticker.go b/vendor/github.com/grafana/dskit/timeutil/variable_ticker.go
new file mode 100644
index 0000000000..6b7b6a9867
--- /dev/null
+++ b/vendor/github.com/grafana/dskit/timeutil/variable_ticker.go
@@ -0,0 +1,65 @@
+package timeutil
+
+import (
+ "sync"
+ "time"
+)
+
+// NewVariableTicker wrap time.Ticker to Reset() the ticker with the next duration (picked from
+// input durations) after each tick. The last configured duration is the one that will be preserved
+// once previous ones have been applied.
+//
+// Returns a function for stopping the ticker, and the ticker channel.
+func NewVariableTicker(durations ...time.Duration) (func(), <-chan time.Time) {
+ if len(durations) == 0 {
+ panic("at least 1 duration required")
+ }
+
+ // Init the ticker with the 1st duration.
+ ticker := time.NewTicker(durations[0])
+ durations = durations[1:]
+
+ // If there was only 1 duration we can simply return the built-in ticker.
+ if len(durations) == 0 {
+ return ticker.Stop, ticker.C
+ }
+
+ // Create a channel over which our ticks will be sent.
+ ticks := make(chan time.Time, 1)
+
+ // Create a channel used to signal once this ticker is stopped.
+ stopped := make(chan struct{})
+
+ go func() {
+ for {
+ select {
+ case ts := <-ticker.C:
+ if len(durations) > 0 {
+ ticker.Reset(durations[0])
+ durations = durations[1:]
+ }
+
+ // Non-blocking send to avoid goroutine leak if stopped while consumer is slow.
+ select {
+ case ticks <- ts:
+ case <-stopped:
+ return
+ }
+
+ case <-stopped:
+ // Interrupt the loop once stopped.
+ return
+ }
+ }
+ }()
+
+ stopOnce := sync.Once{}
+ stop := func() {
+ stopOnce.Do(func() {
+ ticker.Stop()
+ close(stopped)
+ })
+ }
+
+ return stop, ticks
+}
diff --git a/vendor/github.com/grafana/dskit/tracing/otel.go b/vendor/github.com/grafana/dskit/tracing/otel.go
index b9ad904ccf..dfc554b6c0 100644
--- a/vendor/github.com/grafana/dskit/tracing/otel.go
+++ b/vendor/github.com/grafana/dskit/tracing/otel.go
@@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/otel/trace"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
- semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
+ semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
)
var tracer = otel.Tracer("dskit/tracing")
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 985da3c843..6e9edb34d6 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1206,7 +1206,7 @@ github.com/gorilla/websocket
# github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
## explicit; go 1.17
github.com/grafana/cloudflare-go
-# github.com/grafana/dskit v0.0.0-20260120145137-7f862deba99d
+# github.com/grafana/dskit v0.0.0-20260209132809-8d1c6d34bb5a
## explicit; go 1.24.0
github.com/grafana/dskit/backoff
github.com/grafana/dskit/cancellation
@@ -1247,6 +1247,7 @@ github.com/grafana/dskit/signals
github.com/grafana/dskit/spanlogger
github.com/grafana/dskit/tenant
github.com/grafana/dskit/test
+github.com/grafana/dskit/timeutil
github.com/grafana/dskit/tracing
github.com/grafana/dskit/user
# github.com/grafana/go-gelf/v2 v2.0.1