diff --git a/pkg/kafka/partition/metrics.go b/pkg/kafka/partition/metrics.go new file mode 100644 index 0000000000..38afac64fa --- /dev/null +++ b/pkg/kafka/partition/metrics.go @@ -0,0 +1,71 @@ +package partition + +import ( + "math" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/plugin/kprom" + + "github.com/grafana/loki/v3/pkg/kafka/client" +) + +type readerMetrics struct { + phase *prometheus.GaugeVec + receiveDelay *prometheus.HistogramVec + recordsPerFetch prometheus.Histogram + fetchesErrors prometheus.Counter + fetchesTotal prometheus.Counter + fetchWaitDuration prometheus.Histogram + consumeLatency prometheus.Histogram + kprom *kprom.Metrics +} + +// newReaderMetrics initializes and returns a new set of metrics for the PartitionReader. +func newReaderMetrics(r prometheus.Registerer) readerMetrics { + return readerMetrics{ + phase: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_phase", + Help: "The current phase of the consumer.", + }, []string{"phase"}), + receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "loki_ingest_storage_reader_receive_delay_seconds", + Help: "Delay between producing a record and receiving it in the consumer.", + NativeHistogramZeroThreshold: math.Pow(2, -10), // Values below this will be considered to be 0. Equals to 0.0009765625, or about 1ms. + NativeHistogramBucketFactor: 1.2, // We use higher factor (scheme=2) to have wider spread of buckets. + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), // Buckets between 125ms and 9h. + }, []string{"phase"}), + kprom: client.NewReaderClientMetrics("partition-reader", r), + fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds", + Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", + NativeHistogramBucketFactor: 1.1, + }), + recordsPerFetch: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingest_storage_reader_records_per_fetch", + Help: "The number of records received by the consumer in a single fetch operation.", + Buckets: prometheus.ExponentialBuckets(1, 2, 15), + }), + fetchesErrors: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingest_storage_reader_fetch_errors_total", + Help: "The number of fetch errors encountered by the consumer.", + }), + fetchesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingest_storage_reader_fetches_total", + Help: "Total number of Kafka fetches received by the consumer.", + }), + } +} + +func (m *readerMetrics) reportStarting() { + m.phase.WithLabelValues(phaseStarting).Set(1) + m.phase.WithLabelValues(phaseRunning).Set(0) +} + +func (m *readerMetrics) reportRunning() { + m.phase.WithLabelValues(phaseStarting).Set(0) + m.phase.WithLabelValues(phaseRunning).Set(1) +} diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index a1008d88c8..2e05a4200a 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -3,7 +3,6 @@ package partition import ( "context" "fmt" - "math" "time" "github.com/coder/quartz" @@ -14,12 +13,10 @@ import ( "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" - "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/client" @@ -30,6 +27,9 @@ var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadli const ( kafkaStartOffset = -2 kafkaEndOffset = -1 + + phaseStarting = "starting" + phaseRunning = "running" ) // Reader is responsible for reading data from a specific Kafka partition @@ -99,9 +99,7 @@ func (p *Reader) start(ctx context.Context) error { if err != nil { return errors.Wrap(err, "creating kafka reader client") } - - p.metrics.phase.WithLabelValues("starting").Set(1) - p.metrics.phase.WithLabelValues("running").Set(0) + p.metrics.reportStarting() // We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. lastCommittedOffset := p.fetchLastCommittedOffset(ctx) @@ -144,8 +142,7 @@ func (p *Reader) start(ctx context.Context) error { // data from Kafka, and send it to the consumer. func (p *Reader) run(ctx context.Context) error { level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) - p.metrics.phase.WithLabelValues("starting").Set(0) - p.metrics.phase.WithLabelValues("running").Set(1) + p.metrics.reportRunning() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -484,9 +481,9 @@ func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) { numRecords++ delay := now.Sub(record.Timestamp).Seconds() if p.Service.State() == services.Starting { - p.metrics.receiveDelayWhenStarting.Observe(delay) + p.metrics.receiveDelay.WithLabelValues(phaseStarting).Observe(delay) } else { - p.metrics.receiveDelayWhenRunning.Observe(delay) + p.metrics.receiveDelay.WithLabelValues(phaseRunning).Observe(delay) } }) @@ -517,58 +514,3 @@ func isErrFetch(fetch kgo.Fetch) bool { } return false } - -type readerMetrics struct { - phase *prometheus.GaugeVec - receiveDelayWhenStarting prometheus.Observer - receiveDelayWhenRunning prometheus.Observer - recordsPerFetch prometheus.Histogram - fetchesErrors prometheus.Counter - fetchesTotal prometheus.Counter - fetchWaitDuration prometheus.Histogram - // strongConsistencyInstrumentation *StrongReadConsistencyInstrumentation[struct{}] - // lastConsumedOffset prometheus.Gauge - consumeLatency prometheus.Histogram - kprom *kprom.Metrics -} - -// newReaderMetrics initializes and returns a new set of metrics for the PartitionReader. -func newReaderMetrics(reg prometheus.Registerer) readerMetrics { - receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ - Name: "loki_ingest_storage_reader_receive_delay_seconds", - Help: "Delay between producing a record and receiving it in the consumer.", - NativeHistogramZeroThreshold: math.Pow(2, -10), // Values below this will be considered to be 0. Equals to 0.0009765625, or about 1ms. - NativeHistogramBucketFactor: 1.2, // We use higher factor (scheme=2) to have wider spread of buckets. - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 1 * time.Hour, - Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), // Buckets between 125ms and 9h. - }, []string{"phase"}) - - return readerMetrics{ - phase: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "loki_ingest_storage_reader_phase", - Help: "The current phase of the consumer.", - }, []string{"phase"}), - receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), - receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), - kprom: client.NewReaderClientMetrics("partition-reader", reg), - fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds", - Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", - NativeHistogramBucketFactor: 1.1, - }), - recordsPerFetch: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_ingest_storage_reader_records_per_fetch", - Help: "The number of records received by the consumer in a single fetch operation.", - Buckets: prometheus.ExponentialBuckets(1, 2, 15), - }), - fetchesErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "loki_ingest_storage_reader_fetch_errors_total", - Help: "The number of fetch errors encountered by the consumer.", - }), - fetchesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "loki_ingest_storage_reader_fetches_total", - Help: "Total number of Kafka fetches received by the consumer.", - }), - } -}