mirror of
https://github.com/grafana/loki.git
synced 2025-07-26 08:32:26 +08:00
427 lines
14 KiB
Go
427 lines
14 KiB
Go
package querytee
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {
|
|
backendURL1, err := url.Parse("http://backend-1/")
|
|
require.NoError(t, err)
|
|
backendURL2, err := url.Parse("http://backend-2/")
|
|
require.NoError(t, err)
|
|
backendURL3, err := url.Parse("http://backend-3/")
|
|
require.NoError(t, err)
|
|
|
|
backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true)
|
|
backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false)
|
|
backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false)
|
|
|
|
tests := map[string]struct {
|
|
backends []*ProxyBackend
|
|
responses []*backendResponse
|
|
expected *ProxyBackend
|
|
}{
|
|
"the preferred backend is the 1st response received": {
|
|
backends: []*ProxyBackend{backendPref, backendOther1},
|
|
responses: []*backendResponse{
|
|
{backend: backendPref, status: 200},
|
|
},
|
|
expected: backendPref,
|
|
},
|
|
"the preferred backend is the last response received": {
|
|
backends: []*ProxyBackend{backendPref, backendOther1},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 200},
|
|
{backend: backendPref, status: 200},
|
|
},
|
|
expected: backendPref,
|
|
},
|
|
"the preferred backend is the last response received but it's not successful": {
|
|
backends: []*ProxyBackend{backendPref, backendOther1},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 200},
|
|
{backend: backendPref, status: 500},
|
|
},
|
|
expected: backendOther1,
|
|
},
|
|
"the preferred backend is the 2nd response received but only the last one is successful": {
|
|
backends: []*ProxyBackend{backendPref, backendOther1, backendOther2},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 500},
|
|
{backend: backendPref, status: 500},
|
|
{backend: backendOther2, status: 200},
|
|
},
|
|
expected: backendOther2,
|
|
},
|
|
"there's no preferred backend configured and the 1st response is successful": {
|
|
backends: []*ProxyBackend{backendOther1, backendOther2},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 200},
|
|
},
|
|
expected: backendOther1,
|
|
},
|
|
"there's no preferred backend configured and the last response is successful": {
|
|
backends: []*ProxyBackend{backendOther1, backendOther2},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 500},
|
|
{backend: backendOther2, status: 200},
|
|
},
|
|
expected: backendOther2,
|
|
},
|
|
"no received response is successful": {
|
|
backends: []*ProxyBackend{backendPref, backendOther1},
|
|
responses: []*backendResponse{
|
|
{backend: backendOther1, status: 500},
|
|
{backend: backendPref, status: 500},
|
|
},
|
|
expected: backendOther1,
|
|
},
|
|
}
|
|
|
|
for testName, testData := range tests {
|
|
t.Run(testName, func(t *testing.T) {
|
|
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false)
|
|
|
|
// Send the responses from a dedicated goroutine.
|
|
resCh := make(chan *backendResponse)
|
|
go func() {
|
|
for _, res := range testData.responses {
|
|
resCh <- res
|
|
}
|
|
close(resCh)
|
|
}()
|
|
|
|
// Wait for the selected backend response.
|
|
actual := endpoint.waitBackendResponseForDownstream(resCh)
|
|
assert.Equal(t, testData.expected, actual.backend)
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_ProxyEndpoint_Requests(t *testing.T) {
|
|
var (
|
|
requestCount atomic.Uint64
|
|
wg sync.WaitGroup
|
|
testHandler http.HandlerFunc
|
|
)
|
|
|
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
defer wg.Done()
|
|
defer requestCount.Add(1)
|
|
testHandler(w, r)
|
|
})
|
|
backend1 := httptest.NewServer(handler)
|
|
defer backend1.Close()
|
|
backendURL1, err := url.Parse(backend1.URL)
|
|
require.NoError(t, err)
|
|
|
|
backend2 := httptest.NewServer(handler)
|
|
defer backend2.Close()
|
|
backendURL2, err := url.Parse(backend2.URL)
|
|
require.NoError(t, err)
|
|
|
|
backends := []*ProxyBackend{
|
|
NewProxyBackend("backend-1", backendURL1, time.Second, true),
|
|
NewProxyBackend("backend-2", backendURL2, time.Second, false).WithFilter(regexp.MustCompile("/test/api")),
|
|
}
|
|
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false)
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
request func(*testing.T) *http.Request
|
|
handler func(*testing.T) http.HandlerFunc
|
|
counts int
|
|
}{
|
|
{
|
|
name: "GET-request",
|
|
request: func(t *testing.T) *http.Request {
|
|
r, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
|
|
r.Header["test-X"] = []string{"test-X-value"}
|
|
require.NoError(t, err)
|
|
return r
|
|
},
|
|
handler: func(t *testing.T) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, "test-X-value", r.Header.Get("test-X"))
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
},
|
|
counts: 2,
|
|
},
|
|
{
|
|
name: "GET-filter-accept-encoding",
|
|
request: func(t *testing.T) *http.Request {
|
|
r, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
|
|
r.Header.Set("Accept-Encoding", "gzip")
|
|
require.NoError(t, err)
|
|
return r
|
|
},
|
|
handler: func(t *testing.T) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, 0, len(r.Header.Values("Accept-Encoding")))
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
},
|
|
counts: 2,
|
|
},
|
|
{
|
|
name: "GET-filtered",
|
|
request: func(t *testing.T) *http.Request {
|
|
r, err := http.NewRequest("GET", "http://will/not/pass/api/v1/test", nil)
|
|
require.NoError(t, err)
|
|
return r
|
|
},
|
|
handler: func(_ *testing.T) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, _ *http.Request) {
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
},
|
|
counts: 1,
|
|
},
|
|
{
|
|
name: "POST-request-with-body",
|
|
request: func(t *testing.T) *http.Request {
|
|
strings := strings.NewReader("this-is-some-payload")
|
|
r, err := http.NewRequest("POST", "http://test/api/v1/test", strings)
|
|
require.NoError(t, err)
|
|
r.Header["test-X"] = []string{"test-X-value"}
|
|
return r
|
|
},
|
|
handler: func(t *testing.T) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
body, err := io.ReadAll(r.Body)
|
|
require.Equal(t, "this-is-some-payload", string(body))
|
|
require.NoError(t, err)
|
|
require.Equal(t, "test-X-value", r.Header.Get("test-X"))
|
|
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
},
|
|
counts: 2,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// reset request count
|
|
requestCount.Store(0)
|
|
wg.Add(tc.counts)
|
|
|
|
if tc.handler == nil {
|
|
testHandler = func(w http.ResponseWriter, _ *http.Request) {
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
|
|
} else {
|
|
testHandler = tc.handler(t)
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
endpoint.ServeHTTP(w, tc.request(t))
|
|
require.Equal(t, "ok", w.Body.String())
|
|
require.Equal(t, 200, w.Code)
|
|
|
|
wg.Wait()
|
|
require.Equal(t, uint64(tc.counts), requestCount.Load())
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_ProxyEndpoint_SummaryMetrics(t *testing.T) {
|
|
var (
|
|
requestCount atomic.Uint64
|
|
wg sync.WaitGroup
|
|
testHandler http.HandlerFunc
|
|
)
|
|
|
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
defer wg.Done()
|
|
defer requestCount.Add(1)
|
|
testHandler(w, r)
|
|
})
|
|
backend1 := httptest.NewServer(handler)
|
|
defer backend1.Close()
|
|
backendURL1, err := url.Parse(backend1.URL)
|
|
require.NoError(t, err)
|
|
|
|
backend2 := httptest.NewServer(handler)
|
|
defer backend2.Close()
|
|
backendURL2, err := url.Parse(backend2.URL)
|
|
require.NoError(t, err)
|
|
|
|
backends := []*ProxyBackend{
|
|
NewProxyBackend("backend-1", backendURL1, time.Second, true),
|
|
NewProxyBackend("backend-2", backendURL2, time.Second, false),
|
|
}
|
|
|
|
comparator := &mockComparator{}
|
|
proxyMetrics := NewProxyMetrics(prometheus.NewRegistry())
|
|
endpoint := NewProxyEndpoint(backends, "test", proxyMetrics, log.NewNopLogger(), comparator, true)
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
request func(*testing.T) *http.Request
|
|
counts int
|
|
expectedMetrics string
|
|
}{
|
|
{
|
|
name: "missing-metrics-series",
|
|
request: func(t *testing.T) *http.Request {
|
|
r, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
|
|
require.NoError(t, err)
|
|
return r
|
|
},
|
|
counts: 2,
|
|
expectedMetrics: `
|
|
# HELP cortex_querytee_missing_metrics_series Number of missing metrics (series) in a vector response.
|
|
# TYPE cortex_querytee_missing_metrics_series histogram
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.005"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.01"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.025"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.05"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.1"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.25"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.5"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.75"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="1"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="1.5"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="2"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="3"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="4"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="5"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="10"} 0
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="25"} 1
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="50"} 1
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="100"} 1
|
|
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="+Inf"} 1
|
|
cortex_querytee_missing_metrics_series_sum{backend="backend-2",issuer="unknown",route="test",status_code="success"} 12
|
|
cortex_querytee_missing_metrics_series_count{backend="backend-2",issuer="unknown",route="test",status_code="success"} 1
|
|
`,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// reset request count
|
|
requestCount.Store(0)
|
|
wg.Add(tc.counts)
|
|
|
|
testHandler = func(w http.ResponseWriter, _ *http.Request) {
|
|
_, _ = w.Write([]byte("ok"))
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
endpoint.ServeHTTP(w, tc.request(t))
|
|
require.Equal(t, "ok", w.Body.String())
|
|
require.Equal(t, 200, w.Code)
|
|
|
|
wg.Wait()
|
|
require.Equal(t, uint64(tc.counts), requestCount.Load())
|
|
|
|
require.Eventually(t, func() bool {
|
|
return prom_testutil.ToFloat64(proxyMetrics.responsesComparedTotal) == 1
|
|
}, 2*time.Second, 100*time.Millisecond, "expect exactly 1 response to be compared.")
|
|
err := prom_testutil.CollectAndCompare(proxyMetrics.missingMetrics, strings.NewReader(tc.expectedMetrics))
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_backendResponse_succeeded(t *testing.T) {
|
|
tests := map[string]struct {
|
|
resStatus int
|
|
resError error
|
|
expected bool
|
|
}{
|
|
"Error while executing request": {
|
|
resStatus: 0,
|
|
resError: errors.New("network error"),
|
|
expected: false,
|
|
},
|
|
"2xx response status code": {
|
|
resStatus: 200,
|
|
resError: nil,
|
|
expected: true,
|
|
},
|
|
"3xx response status code": {
|
|
resStatus: 300,
|
|
resError: nil,
|
|
expected: false,
|
|
},
|
|
"4xx response status code": {
|
|
resStatus: 400,
|
|
resError: nil,
|
|
expected: true,
|
|
},
|
|
"5xx response status code": {
|
|
resStatus: 500,
|
|
resError: nil,
|
|
expected: false,
|
|
},
|
|
}
|
|
|
|
for testName, testData := range tests {
|
|
t.Run(testName, func(t *testing.T) {
|
|
res := &backendResponse{
|
|
status: testData.resStatus,
|
|
err: testData.resError,
|
|
}
|
|
|
|
assert.Equal(t, testData.expected, res.succeeded())
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_backendResponse_statusCode(t *testing.T) {
|
|
tests := map[string]struct {
|
|
resStatus int
|
|
resError error
|
|
expected int
|
|
}{
|
|
"Error while executing request": {
|
|
resStatus: 0,
|
|
resError: errors.New("network error"),
|
|
expected: 500,
|
|
},
|
|
"200 response status code": {
|
|
resStatus: 200,
|
|
resError: nil,
|
|
expected: 200,
|
|
},
|
|
"503 response status code": {
|
|
resStatus: 503,
|
|
resError: nil,
|
|
expected: 503,
|
|
},
|
|
}
|
|
|
|
for testName, testData := range tests {
|
|
t.Run(testName, func(t *testing.T) {
|
|
res := &backendResponse{
|
|
status: testData.resStatus,
|
|
err: testData.resError,
|
|
}
|
|
|
|
assert.Equal(t, testData.expected, res.statusCode())
|
|
})
|
|
}
|
|
}
|
|
|
|
type mockComparator struct{}
|
|
|
|
func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*ComparisonSummary, error) {
|
|
return &ComparisonSummary{missingMetrics: 12}, nil
|
|
}
|