Files
loki/integration/loki_simple_scalable_test.go
Sandeep Sukhani 6069df8f7d ingestion: native otlp ingestion support (#10727)
**What this PR does / why we need it**:
Add support for natively supporting logs ingestion in OTLP format.
`/otlp/v1/logs` is the new endpoint where users can push logs in OTLP
format. It accepts logs serialized in JSON or proto format. Since OTEL
format is very different than what Loki storage model, here is how data
in OTEL format will be mapped to Loki data model:

* Index labels: The Resource Attributes map quite well to Index labels
in Loki since both usually identify the source of the logs. The problem
however is that Resource attributes in OTLP can have an unbounded number
of values while Loki has a default limit of having up to 30 labels.
Since Index labels in Loki can largely drive the kind of querying
experience the users are going to have, we have chosen select attributes
which would be picked as Index Labels. The ones that are not picked up
as Index labels would be stored as Structured Metadata with each log
entry.
* Timestamp: LogRecord.TimeUnixNano
* LogLine: LogRecord.Body holds the body of the log. However, since Loki
only supports Log body in string format, we will stringify non-string
values using [AsString method from OTEL collector
lib](ab3d6c5b64/pdata/pcommon/value.go (L353)).
* Structured Metadata: Anything which can’t be stored in Index labels
and LogLine. Here is a non-exhaustive list of what will be stored in
Structured Metadata to give a sense of what it will hold:
* Resource Attributes not stored as Index labels is replicated and
stored with each log entry.
* Everything under InstrumentationScope is replicated and stored with
each log entry.
* Everything under LogRecord except LogRecord.Body,
LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp.

*NOTES*:
* Since Loki does not support `.` or any other special characters other
than `_` in label names, we replace all non-supported characters with
`_`.
* Since Loki only supports string in values of Index Labels and
Structured Metadata, all the complex types are converted as follows:
* Map would be flattened into label keys using `_` as separator, same as
how we do it in [json parser in
LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json).
* Everything else is stringified using [AsString method from OTEL
collector
lib](ab3d6c5b64/pdata/pcommon/value.go (L353))

**Special notes for your reviewer**:
I will open follow-up PRs for:
* Documentation
* Make blessed attributes list configurable per tenant.

**Checklist**
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
2023-10-23 11:53:36 +02:00

87 lines
2.3 KiB
Go

package integration
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
)
func TestSimpleScalable_IngestQuery(t *testing.T) {
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
var (
tWrite = clu.AddComponent(
"write",
"-target=write",
)
tBackend = clu.AddComponent(
"backend",
"-target=backend",
"-legacy-read-mode=false",
)
)
require.NoError(t, clu.Run())
tRead := clu.AddComponent(
"read",
"-target=read",
"-common.compactor-address="+tBackend.HTTPURL(),
"-legacy-read-mode=false",
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliWrite := client.New(tenantID, "", tWrite.HTTPURL())
cliWrite.Now = now
cliRead := client.New(tenantID, "", tRead.HTTPURL())
cliRead.Now = now
cliBackend := client.New(tenantID, "", tBackend.HTTPURL())
cliBackend.Now = now
t.Run("ingest logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliWrite.PushLogLine("lineA", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLine("lineB", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLine("lineC", now, nil, map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLine("lineD", now, nil, map[string]string{"job": "fake"}))
})
t.Run("query", func(t *testing.T) {
resp, err := cliRead.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("label-names", func(t *testing.T) {
resp, err := cliRead.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cliRead.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})
}