package lokihttp import ( "bufio" "bytes" "context" "errors" "fmt" "io" "net/http" "strconv" "sync" "time" "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/setting" ) const ( contentType = "application/x-protobuf" maxErrMsgLen = 1024 // Label reserved to override the tenant ID while processing // pipeline stages ReservedLabelTenantID = "__tenant_id__" LatencyLabel = "filename" HostLabel = "host" ) var UserAgent = fmt.Sprintf("grafana/%s", setting.BuildVersion) type metrics struct { encodedBytes *prometheus.CounterVec sentBytes *prometheus.CounterVec droppedBytes *prometheus.CounterVec sentEntries *prometheus.CounterVec droppedEntries *prometheus.CounterVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec countersWithHost []*prometheus.CounterVec } func newMetrics(reg prometheus.Registerer) *metrics { var m metrics m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", }, []string{HostLabel}) m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_bytes_total", Help: "Number of bytes sent.", }, []string{HostLabel}) m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_entries_total", Help: "Number of log entries sent to the ingester.", }, []string{HostLabel}) m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", Help: "Duration of send requests.", }, []string{"status_code", HostLabel}) m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "batch_retries_total", Help: "Number of times batches has had to be retried.", }, []string{HostLabel}) m.countersWithHost = []*prometheus.CounterVec{ m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries, } if reg != nil { m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) } return &m } func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { if err := reg.Register(c); err != nil { promError := prometheus.AlreadyRegisteredError{} if errors.As(err, &promError) { return promError.ExistingCollector } panic(err) } return c } // Client pushes entries to Loki and can be stopped type Client interface { Chan() chan<- Entry Stop() StopNow() } // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { metrics *metrics logger log.Logger cfg Config client *http.Client entries chan Entry once sync.Once wg sync.WaitGroup // ctx is used in any upstream calls from the `client`. ctx context.Context cancel context.CancelFunc } // Tripperware can wrap a roundtripper. type Tripperware func(http.RoundTripper) http.RoundTripper // New makes a new Client. func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) { return newClient(reg, cfg, logger) } func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*client, error) { if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } ctx, cancel := context.WithCancel(context.Background()) c := &client{ logger: logger.New("host", cfg.URL.Host), cfg: cfg, entries: make(chan Entry), metrics: newMetrics(reg), ctx: ctx, cancel: cancel, } err := cfg.Client.Validate() if err != nil { return nil, err } c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", config.WithHTTP2Disabled()) if err != nil { return nil, err } c.client.Timeout = cfg.Timeout // Initialize counters to 0 so the metrics are exported before the first // occurrence of incrementing to avoid missing metrics. for _, counter := range c.metrics.countersWithHost { counter.WithLabelValues(c.cfg.URL.Host).Add(0) } c.wg.Add(1) go c.run() return c, nil } // NewWithTripperware creates a new Loki client with a custom tripperware. func NewWithTripperware(reg prometheus.Registerer, cfg Config, logger log.Logger, tp Tripperware) (Client, error) { c, err := newClient(reg, cfg, logger) if err != nil { return nil, err } if tp != nil { c.client.Transport = tp(c.client.Transport) } return c, nil } func (c *client) run() { batches := map[string]*batch{} // Given the client handles multiple batches (1 per tenant) and each batch // can be created at a different point in time, we look for batches whose // max wait time has been reached every 10 times per BatchWait, so that the // maximum delay we have sending batches is 10% of the max waiting time. // We apply a cap of 10ms to the ticker, to avoid too frequent checks in // case the BatchWait is very low. minWaitCheckFrequency := 10 * time.Millisecond maxWaitCheckFrequency := c.cfg.BatchWait / 10 if maxWaitCheckFrequency < minWaitCheckFrequency { maxWaitCheckFrequency = minWaitCheckFrequency } maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) defer func() { maxWaitCheck.Stop() // Send all pending batches for tenantID, batch := range batches { c.sendBatch(tenantID, batch) } c.wg.Done() }() for { select { case e, ok := <-c.entries: if !ok { return } tenantID := c.getTenantID(e.Labels) batch, ok := batches[tenantID] // If the batch doesn't exist yet, we create a new one with the entry if !ok { batches[tenantID] = newBatch(e) break } // If adding the entry to the batch will increase the size over the max // size allowed, we do send the current batch and then create a new one if batch.sizeBytesAfter(e) > c.cfg.BatchSize { c.sendBatch(tenantID, batch) batches[tenantID] = newBatch(e) break } // The max size of the batch isn't reached, so we can add the entry batch.add(e) case <-maxWaitCheck.C: // Send all batches whose max wait time has been reached for tenantID, batch := range batches { if batch.age() < c.cfg.BatchWait { continue } c.sendBatch(tenantID, batch) delete(batches, tenantID) } } } } func (c *client) getTenantID(labels model.LabelSet) string { // Check if it has been overridden while processing the pipeline stages if value, ok := labels[ReservedLabelTenantID]; ok { return string(value) } // Check if has been specified in the config if c.cfg.TenantID != "" { return c.cfg.TenantID } // Defaults to an empty string, which means the X-Scope-OrgID header // will not be sent return "" } func (c *client) Chan() chan<- Entry { return c.entries } func (c *client) sendBatch(tenantID string, batch *batch) { buf, entriesCount, err := batch.encode() if err != nil { c.logger.Error("error encoding batch", "error", err) return } bufBytes := float64(len(buf)) c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) var status int for { start := time.Now() // send uses `timeout` internally, so `context.Background` is good enough. status, err = c.send(context.Background(), tenantID, buf) c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) // Only retry 429s, 500s and connection-level errors. if status > 0 && status != 429 && status/100 != 5 { break } c.logger.Warn("error sending batch, will retry", "status", status, "error", err) c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() backoff.Wait() // Make sure it sends at least once before checking for retry. if !backoff.Ongoing() { break } } if err != nil { c.logger.Error("final error sending batch", "status", status) c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) } } func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) { ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) defer cancel() req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) if err != nil { return -1, err } req = req.WithContext(ctx) req.Header.Set("Content-Type", contentType) req.Header.Set("User-Agent", UserAgent) // If the tenant ID is not empty promtail is running in multi-tenant mode, so // we should send it to Loki if tenantID != "" { req.Header.Set("X-Scope-OrgID", tenantID) } resp, err := c.client.Do(req) if err != nil { return -1, err } defer func() { err := resp.Body.Close() if err != nil { c.logger.Error("failed to close the response body", "error", err) } }() if resp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) line := "" if scanner.Scan() { line = scanner.Text() } err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) } return resp.StatusCode, err } // Stop the client. func (c *client) Stop() { c.once.Do(func() { close(c.entries) }) c.wg.Wait() } // StopNow stops the client without retries func (c *client) StopNow() { // cancel will stop retrying http requests. c.cancel() c.Stop() }