mirror of
https://github.com/grafana/loki.git
synced 2025-08-02 11:37:22 +08:00
181 lines
5.2 KiB
Go
181 lines
5.2 KiB
Go
//go:build integration
|
|
|
|
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/prometheus/storage/remote"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/integration/client"
|
|
"github.com/grafana/loki/v3/integration/cluster"
|
|
|
|
"github.com/grafana/loki/v3/pkg/ruler"
|
|
)
|
|
|
|
// TestLocalRuleEval tests that rules are evaluated locally with an embedded query engine
|
|
// and that the results are written to the backend correctly.
|
|
func TestLocalRuleEval(t *testing.T) {
|
|
testRuleEval(t, ruler.EvalModeLocal)
|
|
}
|
|
|
|
// TestRemoteRuleEval tests that rules are evaluated remotely against a configured query-frontend
|
|
// and that the results are written to the backend correctly.
|
|
func TestRemoteRuleEval(t *testing.T) {
|
|
testRuleEval(t, ruler.EvalModeRemote)
|
|
}
|
|
|
|
// The only way we can test rule evaluation in an integration test is to use the remote-write feature.
|
|
// In this test we stub out a remote-write receiver and check that the expected data is sent to it.
|
|
// Both the local and the remote rule evaluation modes should produce the same result.
|
|
func testRuleEval(t *testing.T, mode string) {
|
|
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
|
|
c.SetSchemaVer("v13")
|
|
})
|
|
t.Cleanup(func() {
|
|
assert.NoError(t, clu.Cleanup())
|
|
})
|
|
|
|
// initialise a write component and ingest some logs
|
|
tWrite := clu.AddComponent(
|
|
"write",
|
|
"-target=write",
|
|
)
|
|
|
|
now := time.Now()
|
|
tenantID := randStringRunes()
|
|
|
|
require.NoError(t, clu.Run())
|
|
|
|
job := "accesslog"
|
|
|
|
cliWrite := client.New(tenantID, "", tWrite.HTTPURL())
|
|
cliWrite.Now = now
|
|
|
|
// 1. Ingest some logs
|
|
require.NoError(t, cliWrite.PushLogLine("HEAD /", now, nil, map[string]string{"method": "HEAD", "job": job}))
|
|
require.NoError(t, cliWrite.PushLogLine("GET /", now, nil, map[string]string{"method": "GET", "job": job}))
|
|
require.NoError(t, cliWrite.PushLogLine("GET /", now.Add(time.Second), nil, map[string]string{"method": "GET", "job": job}))
|
|
|
|
// advance time to after the last ingested log line so queries don't return empty results
|
|
now = now.Add(time.Second * 2)
|
|
|
|
// start up read component for remote rule evaluation
|
|
tRead := clu.AddComponent(
|
|
"read",
|
|
"-target=read",
|
|
// we set a fake address here because deletion is not being tested,
|
|
// and we have a circular dependency with the backend
|
|
"-common.compactor-address=http://fake",
|
|
"-legacy-read-mode=false",
|
|
"-query-scheduler.use-scheduler-ring=false",
|
|
)
|
|
|
|
require.NoError(t, clu.Run())
|
|
|
|
// start up a backend component which contains the ruler
|
|
tBackend := clu.AddComponent(
|
|
"backend",
|
|
"-target=backend",
|
|
"-legacy-read-mode=false",
|
|
)
|
|
|
|
rwHandler := func(called *bool, test func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
|
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/api/v1/write" {
|
|
t.Errorf("Expected to request '/api/v1/write', got: %s", r.URL.Path)
|
|
}
|
|
|
|
test(w, r)
|
|
|
|
*called = true
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
}
|
|
|
|
// this is the function that will be called when the remote-write receiver receives a request.
|
|
// it tests that the expected payload is received.
|
|
expectedResults := func(_ http.ResponseWriter, r *http.Request) {
|
|
wr, err := remote.DecodeWriteRequest(r.Body)
|
|
require.NoError(t, err)
|
|
|
|
// depending on the rule interval, we may get multiple timeseries before remote-write is triggered,
|
|
// so we just check that we have at least one that matches our requirements.
|
|
require.GreaterOrEqual(t, len(wr.Timeseries), 1)
|
|
|
|
// we expect to see two GET lines from the aggregation in the recording rule
|
|
require.Equal(t, wr.Timeseries[len(wr.Timeseries)-1].Samples[0].Value, float64(2))
|
|
}
|
|
|
|
var called bool
|
|
server1 := rwHandler(&called, expectedResults)
|
|
defer server1.Close()
|
|
|
|
// configure the backend component
|
|
tBackend.WithRulerRemoteWrite("target1", server1.URL)
|
|
|
|
if mode == ruler.EvalModeRemote {
|
|
tBackend.WithExtraConfig(fmt.Sprintf(`
|
|
ruler:
|
|
evaluation:
|
|
mode: %s
|
|
query_frontend:
|
|
address: %s
|
|
`, mode, tRead.GRPCURL()))
|
|
}
|
|
|
|
record := fmt.Sprintf(`
|
|
groups:
|
|
- name: record
|
|
interval: 1s
|
|
rules:
|
|
- record: test
|
|
expr: sum by (method) (count_over_time({job="%s", method="GET"}[1m]))
|
|
labels:
|
|
foo: bar
|
|
`, job)
|
|
|
|
require.NoError(t, tBackend.WithTenantRules(map[string]map[string]string{
|
|
tenantID: {
|
|
"record.yaml": record,
|
|
},
|
|
}))
|
|
|
|
m, e := tBackend.MergedConfig()
|
|
require.NoError(t, e)
|
|
t.Logf("starting backend with config:\n%s\n", m)
|
|
|
|
require.NoError(t, clu.Run())
|
|
|
|
cliBackend := client.New(tenantID, "", tBackend.HTTPURL())
|
|
cliBackend.Now = now
|
|
|
|
// 2. Assert rules evaluation
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// check rules exist
|
|
resp, err := cliBackend.GetRules(ctx)
|
|
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
|
|
require.Equal(t, "success", resp.Status)
|
|
|
|
require.Len(t, resp.Data.Groups, 1)
|
|
require.Len(t, resp.Data.Groups[0].Rules, 1)
|
|
|
|
// ensure that both remote-write receivers were called
|
|
require.Eventually(t, func() bool {
|
|
return assert.ObjectsAreEqualValues(true, called)
|
|
}, 30*time.Second, 100*time.Millisecond, "remote-write was not called")
|
|
}
|