mirror of
https://github.com/grafana/loki.git
synced 2025-07-26 08:32:26 +08:00

**What this PR does / why we need it**:
Sometimes we want to not just fail a querytee but also report results on
the comparison between expected and actual results. That's why querytee
is extended with special metrics to give more insights into the
differences.
**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec
)
125 lines
3.4 KiB
Go
125 lines
3.4 KiB
Go
package querytee
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// ProxyBackend holds the information of a single backend.
|
|
type ProxyBackend struct {
|
|
name string
|
|
endpoint *url.URL
|
|
client *http.Client
|
|
timeout time.Duration
|
|
|
|
// Whether this is the preferred backend from which picking up
|
|
// the response and sending it back to the client.
|
|
preferred bool
|
|
|
|
// Only process requests that match the filter.
|
|
filter *regexp.Regexp
|
|
}
|
|
|
|
// NewProxyBackend makes a new ProxyBackend
|
|
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool) *ProxyBackend {
|
|
return &ProxyBackend{
|
|
name: name,
|
|
endpoint: endpoint,
|
|
timeout: timeout,
|
|
preferred: preferred,
|
|
client: &http.Client{
|
|
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
|
|
return errors.New("the query-tee proxy does not follow redirects")
|
|
},
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
DialContext: (&net.Dialer{
|
|
Timeout: 30 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
}).DialContext,
|
|
MaxIdleConns: 100,
|
|
MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801
|
|
IdleConnTimeout: 90 * time.Second,
|
|
DisableCompression: true,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (b *ProxyBackend) WithFilter(f *regexp.Regexp) *ProxyBackend {
|
|
b.filter = f
|
|
return b
|
|
}
|
|
|
|
func (b *ProxyBackend) ForwardRequest(orig *http.Request, body io.ReadCloser) (int, []byte, error) {
|
|
req := b.createBackendRequest(orig, body)
|
|
return b.doBackendRequest(req)
|
|
}
|
|
|
|
func (b *ProxyBackend) createBackendRequest(orig *http.Request, body io.ReadCloser) *http.Request {
|
|
req := orig.Clone(context.Background())
|
|
req.Body = body
|
|
// RequestURI can't be set on a cloned request. It's only for handlers.
|
|
req.RequestURI = ""
|
|
// Replace the endpoint with the backend one.
|
|
req.URL.Scheme = b.endpoint.Scheme
|
|
req.URL.Host = b.endpoint.Host
|
|
|
|
// Prepend the endpoint path to the request path.
|
|
req.URL.Path = path.Join(b.endpoint.Path, req.URL.Path)
|
|
|
|
// Set the correct host header for the backend
|
|
req.Header.Set("Host", b.endpoint.Host)
|
|
|
|
// Replace the auth:
|
|
// - If the endpoint has user and password, use it.
|
|
// - If the endpoint has user only, keep it and use the request password (if any).
|
|
// - If the endpoint has no user and no password, use the request auth (if any).
|
|
clientUser, clientPass, clientAuth := orig.BasicAuth()
|
|
endpointUser := b.endpoint.User.Username()
|
|
endpointPass, _ := b.endpoint.User.Password()
|
|
|
|
req.Header.Del("Authorization")
|
|
if endpointUser != "" && endpointPass != "" {
|
|
req.SetBasicAuth(endpointUser, endpointPass)
|
|
} else if endpointUser != "" {
|
|
req.SetBasicAuth(endpointUser, clientPass)
|
|
} else if clientAuth {
|
|
req.SetBasicAuth(clientUser, clientPass)
|
|
}
|
|
|
|
// Remove Accept-Encoding header to avoid sending compressed responses
|
|
req.Header.Del("Accept-Encoding")
|
|
|
|
return req
|
|
}
|
|
|
|
func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, error) {
|
|
// Honor the read timeout.
|
|
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
|
|
defer cancel()
|
|
|
|
// Execute the request.
|
|
res, err := b.client.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return 0, nil, errors.Wrap(err, "executing backend request")
|
|
}
|
|
|
|
// Read the entire response body.
|
|
defer res.Body.Close()
|
|
body, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return 0, nil, errors.Wrap(err, "reading backend response")
|
|
}
|
|
|
|
return res.StatusCode, body, nil
|
|
}
|