Elasticsearch: Implement processing of logs query results in backend (#63647)

* Elasticsearch: Add processing of logs query to backend

* Add and fix tests

* Add snapshot tests

* Fix test in ES client

* Small updates, remove redundant logic

* Refactor setPreferredVisType to improve readability
This commit is contained in:
Ivana Huckova
2023-03-01 11:50:56 +01:00
committed by GitHub
parent 8ac92aab19
commit d258c8ef8a
12 changed files with 1061 additions and 54 deletions

View File

@ -36,7 +36,7 @@ const (
logsType = "logs"
)
func parseResponse(responses []*es.SearchResponse, targets []*Query, timeField string) (*backend.QueryDataResponse, error) {
func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
@ -58,12 +58,19 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, timeField s
queryRes := backend.DataResponse{}
if isDocumentQuery(target) {
err := processDocumentResponse(res, target, timeField, &queryRes)
err := processDocumentResponse(res, target, configuredFields, &queryRes)
if err != nil {
return &backend.QueryDataResponse{}, err
}
result.Responses[target.RefID] = queryRes
} else if isLogsQuery(target) {
err := processLogsResponse(res, target, configuredFields, &queryRes)
if err != nil {
return &backend.QueryDataResponse{}, err
}
result.Responses[target.RefID] = queryRes
} else {
// Process as metric query result
props := make(map[string]string)
err := processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
@ -78,9 +85,56 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, timeField s
return &result, nil
}
func processDocumentResponse(res *es.SearchResponse, target *Query, timeField string, queryRes *backend.DataResponse) error {
docs := make([]map[string]interface{}, len(res.Hits.Hits))
func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
propNames := make(map[string]bool)
docs := make([]map[string]interface{}, len(res.Hits.Hits))
for hitIdx, hit := range res.Hits.Hits {
var flattened map[string]interface{}
if hit["_source"] != nil {
flattened = flatten(hit["_source"].(map[string]interface{}))
}
doc := map[string]interface{}{
"_id": hit["_id"],
"_type": hit["_type"],
"_index": hit["_index"],
"sort": hit["sort"],
"highlight": hit["highlight"],
"_source": flattened,
}
for k, v := range flattened {
if configuredFields.LogLevelField != "" && k == configuredFields.LogLevelField {
doc["level"] = v
} else {
doc[k] = v
}
}
for key := range doc {
propNames[key] = true
}
// TODO: Implement highlighting
docs[hitIdx] = doc
}
sortedPropNames := sortPropNames(propNames, configuredFields, true)
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)
frames := data.Frames{}
frame := data.NewFrame("", fields...)
setPreferredVisType(frame, "logs")
frames = append(frames, frame)
queryRes.Frames = frames
return nil
}
func processDocumentResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
propNames := make(map[string]bool)
docs := make([]map[string]interface{}, len(res.Hits.Hits))
for hitIdx, hit := range res.Hits.Hits {
var flattened map[string]interface{}
@ -108,18 +162,28 @@ func processDocumentResponse(res *es.SearchResponse, target *Query, timeField st
docs[hitIdx] = doc
}
sortedPropNames := sortPropNames(propNames, configuredFields, false)
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)
frames := data.Frames{}
frame := data.NewFrame("", fields...)
frames = append(frames, frame)
queryRes.Frames = frames
return nil
}
func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []string, configuredFields es.ConfiguredFields) []*data.Field {
size := len(docs)
isFilterable := true
allFields := make([]*data.Field, len(propNames))
sortedPropNames := sortPropNames(propNames, timeField)
for propNameIdx, propName := range sortedPropNames {
for propNameIdx, propName := range propNames {
// Special handling for time field
if propName == timeField {
if propName == configuredFields.TimeField {
timeVector := make([]*time.Time, size)
for i, doc := range docs {
timeString, ok := doc[timeField].(string)
timeString, ok := doc[configuredFields.TimeField].(string)
if !ok {
continue
}
@ -131,7 +195,7 @@ func processDocumentResponse(res *es.SearchResponse, target *Query, timeField st
timeVector[i] = &timeValue
}
}
field := data.NewField(timeField, nil, timeVector)
field := data.NewField(configuredFields.TimeField, nil, timeVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
allFields[propNameIdx] = field
continue
@ -166,12 +230,7 @@ func processDocumentResponse(res *es.SearchResponse, target *Query, timeField st
}
}
frames := data.Frames{}
frame := data.NewFrame("", allFields...)
frames = append(frames, frame)
queryRes.Frames = frames
return nil
return allFields
}
func processBuckets(aggs map[string]interface{}, target *Query,
@ -894,14 +953,18 @@ func flatten(target map[string]interface{}) map[string]interface{} {
return output
}
// sortPropNames orders propNames so that timeField is first (if it exists) and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, timeField string) []string {
// sortPropNames orders propNames so that timeField is first (if it exists), log message field is second
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string {
hasTimeField := false
hasLogMessageField := false
var sortedPropNames []string
for k := range propNames {
if k == timeField {
if configuredFields.TimeField != "" && k == configuredFields.TimeField {
hasTimeField = true
} else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField {
hasLogMessageField = true
} else {
sortedPropNames = append(sortedPropNames, k)
}
@ -909,8 +972,12 @@ func sortPropNames(propNames map[string]bool, timeField string) []string {
sort.Strings(sortedPropNames)
if hasLogMessageField {
sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...)
}
if hasTimeField {
sortedPropNames = append([]string{timeField}, sortedPropNames...)
sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...)
}
return sortedPropNames
@ -939,3 +1006,11 @@ func createFieldOfType[T int | float64 | bool | string](docs []map[string]interf
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}
func setPreferredVisType(frame *data.Frame, visType data.VisType) {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.PreferredVisualization = visType
}