Files
loki/tools/querytee/proxy_backend.go
Karsten Jeschkies 5ba154a6de Report missing metrics in querytee. (#10304)
**What this PR does / why we need it**:
Sometimes we want to not just fail a querytee but also report results on
the comparison between expected and actual results. That's why querytee
is extended with special metrics to give more insights into the
differences.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
2023-08-24 12:59:47 +02:00

125 lines
3.4 KiB
Go

package querytee
import (
"context"
"io"
"net"
"net/http"
"net/url"
"path"
"regexp"
"time"
"github.com/pkg/errors"
)
// ProxyBackend holds the information of a single backend.
type ProxyBackend struct {
name string
endpoint *url.URL
client *http.Client
timeout time.Duration
// Whether this is the preferred backend from which picking up
// the response and sending it back to the client.
preferred bool
// Only process requests that match the filter.
filter *regexp.Regexp
}
// NewProxyBackend makes a new ProxyBackend
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool) *ProxyBackend {
return &ProxyBackend{
name: name,
endpoint: endpoint,
timeout: timeout,
preferred: preferred,
client: &http.Client{
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
return errors.New("the query-tee proxy does not follow redirects")
},
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
},
},
}
}
func (b *ProxyBackend) WithFilter(f *regexp.Regexp) *ProxyBackend {
b.filter = f
return b
}
func (b *ProxyBackend) ForwardRequest(orig *http.Request, body io.ReadCloser) (int, []byte, error) {
req := b.createBackendRequest(orig, body)
return b.doBackendRequest(req)
}
func (b *ProxyBackend) createBackendRequest(orig *http.Request, body io.ReadCloser) *http.Request {
req := orig.Clone(context.Background())
req.Body = body
// RequestURI can't be set on a cloned request. It's only for handlers.
req.RequestURI = ""
// Replace the endpoint with the backend one.
req.URL.Scheme = b.endpoint.Scheme
req.URL.Host = b.endpoint.Host
// Prepend the endpoint path to the request path.
req.URL.Path = path.Join(b.endpoint.Path, req.URL.Path)
// Set the correct host header for the backend
req.Header.Set("Host", b.endpoint.Host)
// Replace the auth:
// - If the endpoint has user and password, use it.
// - If the endpoint has user only, keep it and use the request password (if any).
// - If the endpoint has no user and no password, use the request auth (if any).
clientUser, clientPass, clientAuth := orig.BasicAuth()
endpointUser := b.endpoint.User.Username()
endpointPass, _ := b.endpoint.User.Password()
req.Header.Del("Authorization")
if endpointUser != "" && endpointPass != "" {
req.SetBasicAuth(endpointUser, endpointPass)
} else if endpointUser != "" {
req.SetBasicAuth(endpointUser, clientPass)
} else if clientAuth {
req.SetBasicAuth(clientUser, clientPass)
}
// Remove Accept-Encoding header to avoid sending compressed responses
req.Header.Del("Accept-Encoding")
return req
}
func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, error) {
// Honor the read timeout.
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
// Execute the request.
res, err := b.client.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.Wrap(err, "executing backend request")
}
// Read the entire response body.
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, nil, errors.Wrap(err, "reading backend response")
}
return res.StatusCode, body, nil
}