Files
Peter Štibraný 8fd5739576 [unified-storage/search] Don't expire file-based indexes, check for resource stats when building index on-demand (#107886)
* Get ResourceStats before indexing
* Replaced localcache.CacheService to handle expiration faster (localcache.CacheService / gocache.Cache only expires values at specific interval, but we need to close index faster)
* singleflight getOrBuildIndex for the same key
* expire only in-memory indexes
* file-based indexes have new name on each rebuild
* Sanitize file path segments, verify that generated path is within the root dir.
* Add comment and test for cleanOldIndexes.
2025-07-10 11:54:10 +00:00

1352 lines
36 KiB
Go

package search
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"math"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/keyword"
"github.com/blevesearch/bleve/v2/analysis/analyzer/standard"
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query"
bleveSearch "github.com/blevesearch/bleve/v2/search/searcher"
index "github.com/blevesearch/bleve_index_api"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/selection"
"github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
const (
// tracingPrexfixBleve is the prefix used for tracing spans in the Bleve backend
tracingPrexfixBleve = "unified_search.bleve."
)
var _ resource.SearchBackend = &bleveBackend{}
var _ resource.ResourceIndex = &bleveIndex{}
type BleveOptions struct {
// The root folder where file objects are saved
Root string
// The resource count where values switch from memory to file based
FileThreshold int64
// How big should a batch get before flushing
// ?? not totally sure the units
BatchSize int
// Index cache TTL for bleve indices. 0 disables expiration for in-memory indexes.
IndexCacheTTL time.Duration
}
type bleveBackend struct {
tracer trace.Tracer
log *slog.Logger
opts BleveOptions
cacheMx sync.RWMutex
cache map[resource.NamespacedResource]*bleveIndex
features featuremgmt.FeatureToggles
indexMetrics *resource.BleveIndexMetrics
}
func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, features featuremgmt.FeatureToggles, indexMetrics *resource.BleveIndexMetrics) (*bleveBackend, error) {
if opts.Root == "" {
return nil, fmt.Errorf("bleve backend missing root folder configuration")
}
absRoot, err := filepath.Abs(opts.Root)
if err != nil {
return nil, fmt.Errorf("error getting absolute path for bleve root folder %w", err)
}
opts.Root = absRoot
root, err := os.Stat(opts.Root)
if err != nil {
return nil, fmt.Errorf("error opening bleve root folder %w", err)
}
if !root.IsDir() {
return nil, fmt.Errorf("bleve root is configured against a file (not folder)")
}
be := &bleveBackend{
log: slog.Default().With("logger", "bleve-backend"),
tracer: tracer,
cache: map[resource.NamespacedResource]*bleveIndex{},
opts: opts,
features: features,
indexMetrics: indexMetrics,
}
go be.updateIndexSizeMetric(opts.Root)
return be, nil
}
// GetIndex will return nil if the key does not exist
func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) {
idx := b.getCachedIndex(key)
// Avoid returning typed nils.
if idx == nil {
return nil, nil
}
return idx, nil
}
func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource) *bleveIndex {
// Check index with read-lock first.
b.cacheMx.RLock()
val := b.cache[key]
b.cacheMx.RUnlock()
if val == nil {
return nil
}
if val.expiration.IsZero() || val.expiration.After(time.Now()) {
// Not expired yet.
return val
}
// We're dealing with expired index. We need to remove it from the cache and close it.
b.cacheMx.Lock()
val = b.cache[key]
delete(b.cache, key)
b.cacheMx.Unlock()
if val == nil {
return nil
}
// Index is no longer in the cache, but we need to close it.
err := val.index.Close()
if err != nil {
b.log.Error("failed to close index", "key", key, "err", err)
}
b.log.Info("index evicted from cache", "key", key)
return nil
}
// updateIndexSizeMetric sets the total size of all file-based indices metric.
func (b *bleveBackend) updateIndexSizeMetric(indexPath string) {
if b.indexMetrics == nil {
return
}
for {
var totalSize int64
err := filepath.WalkDir(indexPath, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
fileInfo, err := info.Info()
if err != nil {
return err
}
totalSize += fileInfo.Size()
}
return nil
})
if err == nil {
b.indexMetrics.IndexSize.Set(float64(totalSize))
} else {
b.log.Error("got error while trying to calculate bleve file index size", "error", err)
}
time.Sleep(60 * time.Second)
}
}
// BuildIndex builds an index from scratch.
// If built successfully, the new index replaces the old index in the cache (if there was any).
func (b *bleveBackend) BuildIndex(
ctx context.Context,
key resource.NamespacedResource,
size int64,
resourceVersion int64,
fields resource.SearchableDocumentFields,
builder func(index resource.ResourceIndex) (int64, error),
) (resource.ResourceIndex, error) {
_, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex")
defer span.End()
var index bleve.Index
fileIndexName := "" // Name of the file-based index, or empty for in-memory indexes.
build := true
mapper, err := GetBleveMappings(fields)
if err != nil {
return nil, err
}
cachedIndex := b.getCachedIndex(key)
logWithDetails := b.log.With("namespace", key.Namespace, "group", key.Group, "resource", key.Resource, "size", size, "rv", resourceVersion)
resourceDir := filepath.Join(b.opts.Root, cleanFileSegment(key.Namespace), cleanFileSegment(fmt.Sprintf("%s.%s", key.Resource, key.Group)))
if size > b.opts.FileThreshold {
// We only check for the existing file-based index if we don't already have an open index for this key.
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
// If we do have an unexpired cached index already, we always build a new index from scratch.
if cachedIndex == nil && resourceVersion > 0 {
index, fileIndexName = b.findPreviousFileBasedIndex(resourceDir, resourceVersion, size)
}
if index != nil {
build = false
logWithDetails.Debug("Existing index found on filesystem", "directory", filepath.Join(resourceDir, fileIndexName))
} else {
// Building index from scratch. Index name has a time component in it to be unique, but if
// we happen to create non-unique name, we bump the time and try again.
indexDir := ""
now := time.Now()
for index == nil {
fileIndexName = formatIndexName(time.Now(), resourceVersion)
indexDir = filepath.Join(resourceDir, fileIndexName)
if !isPathWithinRoot(indexDir, b.opts.Root) {
return nil, fmt.Errorf("invalid path %s", indexDir)
}
index, err = bleve.New(indexDir, mapper)
if errors.Is(err, bleve.ErrorIndexPathExists) {
now = now.Add(time.Second) // Bump time for next try
index = nil // Bleve actually returns non-nil value with ErrorIndexPathExists
continue
}
if err != nil {
return nil, fmt.Errorf("error creating new bleve index: %s %w", indexDir, err)
}
}
logWithDetails.Info("Building index using filesystem", "directory", indexDir)
}
if b.indexMetrics != nil {
b.indexMetrics.IndexTenants.WithLabelValues("file").Inc()
}
} else {
index, err = bleve.NewMemOnly(mapper)
if err != nil {
return nil, fmt.Errorf("error creating new in-memory bleve index: %w", err)
}
if b.indexMetrics != nil {
b.indexMetrics.IndexTenants.WithLabelValues("memory").Inc()
}
logWithDetails.Info("Building index using memory")
}
// Batch all the changes
idx := &bleveIndex{
key: key,
index: index,
fields: fields,
standard: resource.StandardSearchFields(),
features: b.features,
tracing: b.tracer,
}
idx.allFields, err = getAllFields(idx.standard, fields)
if err != nil {
return nil, err
}
if build {
start := time.Now()
_, err = builder(idx)
if err != nil {
return nil, err
}
elapsed := time.Since(start)
logWithDetails.Info("Finished building index", "elapsed", elapsed)
}
// Set expiration after building the index. Only expire in-memory indexes.
if fileIndexName == "" && b.opts.IndexCacheTTL > 0 {
idx.expiration = time.Now().Add(b.opts.IndexCacheTTL)
}
// Store the index in the cache.
if idx.expiration.IsZero() {
logWithDetails.Info("Storing index in cache, with no expiration", "key", key)
} else {
logWithDetails.Info("Storing index in cache", "key", key, "expiration", idx.expiration)
}
b.cacheMx.Lock()
prev := b.cache[key]
b.cache[key] = idx
b.cacheMx.Unlock()
// If there was a previous index in the cache, close it.
if prev != nil {
err := prev.index.Close()
if err != nil {
logWithDetails.Error("failed to close previous index", "key", key, "err", err)
}
}
// Start a background task to cleanup the old index directories. If we have built a new file-based index,
// the new name is ignored. If we have created in-memory index and fileIndexName is empty, all old directories can be removed.
go b.cleanOldIndexes(resourceDir, fileIndexName)
return idx, nil
}
func cleanFileSegment(input string) string {
input = strings.ReplaceAll(input, string(filepath.Separator), "_")
input = strings.ReplaceAll(input, "..", "_")
return input
}
// cleanOldIndexes deletes all subdirectories inside dir, skipping directory with "skipName".
// "skipName" can be empty.
func (b *bleveBackend) cleanOldIndexes(dir string, skipName string) {
files, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
return
}
b.log.Warn("error cleaning folders from", "directory", dir, "error", err)
return
}
for _, file := range files {
if file.IsDir() && file.Name() != skipName {
fpath := filepath.Join(dir, file.Name())
if !isPathWithinRoot(fpath, b.opts.Root) {
b.log.Warn("Skipping cleanup of directory", "directory", fpath)
continue
}
err = os.RemoveAll(fpath)
if err != nil {
b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err)
} else {
b.log.Info("Removed old index folder", "directory", fpath)
}
}
}
}
// isPathWithinRoot verifies that path is within given absoluteRoot.
func isPathWithinRoot(path, absoluteRoot string) bool {
if path == "" || absoluteRoot == "" {
return false
}
path, err := filepath.Abs(path)
if err != nil {
return false
}
if !strings.HasPrefix(path, absoluteRoot) {
return false
}
return true
}
// cacheKeys returns list of keys for indexes in the cache (including possibly expired ones).
func (b *bleveBackend) cacheKeys() []resource.NamespacedResource {
b.cacheMx.RLock()
defer b.cacheMx.RUnlock()
keys := make([]resource.NamespacedResource, 0, len(b.cache))
for k := range b.cache {
keys = append(keys, k)
}
return keys
}
// TotalDocs returns the total number of documents across all indices
func (b *bleveBackend) TotalDocs() int64 {
var totalDocs int64
// We iterate over keys and call getCachedIndex for each index individually.
// We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access).
// Calling getCachedIndex also handles index expiration.
for _, key := range b.cacheKeys() {
idx := b.getCachedIndex(key)
if idx == nil {
continue
}
c, err := idx.index.DocCount()
if err != nil {
continue
}
totalDocs += int64(c)
}
return totalDocs
}
func formatIndexName(now time.Time, resourceVersion int64) string {
timestamp := now.Format("20060102-150405")
return fmt.Sprintf("%s-%d", timestamp, resourceVersion)
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, resourceVersion int64, size int64) (bleve.Index, string) {
entries, err := os.ReadDir(resourceDir)
if err != nil {
return nil, ""
}
indexName := ""
for _, ent := range entries {
if !ent.IsDir() {
continue
}
parts := strings.Split(ent.Name(), "-")
if len(parts) != 3 {
continue
}
// Last part is resourceVersion
indexRv, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
continue
}
if indexRv != resourceVersion {
continue
}
indexName = ent.Name()
break
}
if indexName == "" {
return nil, ""
}
indexDir := filepath.Join(resourceDir, indexName)
idx, err := bleve.Open(indexDir)
if err != nil {
return nil, ""
}
cnt, err := idx.DocCount()
if err != nil {
_ = idx.Close()
return nil, ""
}
if uint64(size) != cnt {
_ = idx.Close()
return nil, ""
}
return idx, indexName
}
func (b *bleveBackend) closeAllIndexes() {
b.cacheMx.Lock()
defer b.cacheMx.Unlock()
for key, idx := range b.cache {
_ = idx.index.Close()
delete(b.cache, key)
}
}
type bleveIndex struct {
key resource.NamespacedResource
index bleve.Index
standard resource.SearchableDocumentFields
fields resource.SearchableDocumentFields
// When to expire and close the index. Zero value = no expiration.
// We only expire in-memory indexes.
expiration time.Time
// The values returned with all
allFields []*resourcepb.ResourceTableColumnDefinition
features featuremgmt.FeatureToggles
tracing trace.Tracer
}
// BulkIndex implements resource.ResourceIndex.
func (b *bleveIndex) BulkIndex(req *resource.BulkIndexRequest) error {
if len(req.Items) == 0 {
return nil
}
batch := b.index.NewBatch()
for _, item := range req.Items {
switch item.Action {
case resource.ActionIndex:
if item.Doc == nil {
return fmt.Errorf("missing document")
}
doc := item.Doc.UpdateCopyFields()
err := batch.Index(resource.SearchID(doc.Key), doc)
if err != nil {
return err
}
case resource.ActionDelete:
batch.Delete(resource.SearchID(item.Key))
}
}
return b.index.Batch(batch)
}
func (b *bleveIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if req.NextPageToken != "" {
return nil, fmt.Errorf("next page not implemented yet")
}
if req.Kind == "" {
return &resourcepb.ListManagedObjectsResponse{
Error: resource.NewBadRequestError("empty manager kind"),
}, nil
}
if req.Id == "" {
return &resourcepb.ListManagedObjectsResponse{
Error: resource.NewBadRequestError("empty manager id"),
}, nil
}
q := bleve.NewBooleanQuery()
q.AddMust(&query.TermQuery{
Term: req.Kind,
FieldVal: resource.SEARCH_FIELD_MANAGER_KIND,
})
q.AddMust(&query.TermQuery{
Term: req.Id,
FieldVal: resource.SEARCH_FIELD_MANAGER_ID,
})
found, err := b.index.SearchInContext(ctx, &bleve.SearchRequest{
Query: q,
Fields: []string{
resource.SEARCH_FIELD_TITLE,
resource.SEARCH_FIELD_FOLDER,
resource.SEARCH_FIELD_MANAGER_KIND,
resource.SEARCH_FIELD_MANAGER_ID,
resource.SEARCH_FIELD_SOURCE_PATH,
resource.SEARCH_FIELD_SOURCE_CHECKSUM,
resource.SEARCH_FIELD_SOURCE_TIME,
},
Sort: search.SortOrder{
&search.SortField{
Field: resource.SEARCH_FIELD_SOURCE_PATH,
Type: search.SortFieldAsString,
Desc: false,
},
},
Size: 1000000000, // big number
From: 0, // next page token not yet supported
})
if err != nil {
return nil, err
}
asString := func(v any) string {
if v == nil {
return ""
}
str, ok := v.(string)
if ok {
return str
}
return fmt.Sprintf("%v", v)
}
asTime := func(v any) int64 {
if v == nil {
return 0
}
intV, ok := v.(int64)
if ok {
return intV
}
floatV, ok := v.(float64)
if ok {
return int64(floatV)
}
str, ok := v.(string)
if ok {
t, _ := time.Parse(time.RFC3339, str)
return t.UnixMilli()
}
return 0
}
rsp := &resourcepb.ListManagedObjectsResponse{}
for _, hit := range found.Hits {
item := &resourcepb.ListManagedObjectsResponse_Item{
Object: &resourcepb.ResourceKey{},
Hash: asString(hit.Fields[resource.SEARCH_FIELD_SOURCE_CHECKSUM]),
Path: asString(hit.Fields[resource.SEARCH_FIELD_SOURCE_PATH]),
Time: asTime(hit.Fields[resource.SEARCH_FIELD_SOURCE_TIME]),
Title: asString(hit.Fields[resource.SEARCH_FIELD_TITLE]),
Folder: asString(hit.Fields[resource.SEARCH_FIELD_FOLDER]),
}
err := resource.ReadSearchID(item.Object, hit.ID)
if err != nil {
return nil, err
}
rsp.Items = append(rsp.Items, item)
}
return rsp, nil
}
func (b *bleveIndex) CountManagedObjects(ctx context.Context) ([]*resourcepb.CountManagedObjectsResponse_ResourceCount, error) {
found, err := b.index.SearchInContext(ctx, &bleve.SearchRequest{
Query: bleve.NewMatchAllQuery(),
Size: 0,
Facets: bleve.FacetsRequest{
"count": bleve.NewFacetRequest(resource.SEARCH_FIELD_MANAGED_BY, 1000), // typically less then 5
},
})
if err != nil {
return nil, err
}
vals := make([]*resourcepb.CountManagedObjectsResponse_ResourceCount, 0)
f, ok := found.Facets["count"]
if ok && f.Terms != nil {
for _, v := range f.Terms.Terms() {
val := v.Term
idx := strings.Index(val, ":")
if idx > 0 {
vals = append(vals, &resourcepb.CountManagedObjectsResponse_ResourceCount{
Kind: val[0:idx],
Id: val[idx+1:],
Group: b.key.Group,
Resource: b.key.Resource,
Count: int64(v.Count),
})
}
}
}
return vals, nil
}
// Search implements resource.DocumentIndex.
func (b *bleveIndex) Search(
ctx context.Context,
access authlib.AccessClient,
req *resourcepb.ResourceSearchRequest,
federate []resource.ResourceIndex, // For federated queries, these will match the values in req.federate
) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := b.tracing.Start(ctx, tracingPrexfixBleve+"Search")
defer span.End()
if req.Options == nil || req.Options.Key == nil {
return &resourcepb.ResourceSearchResponse{
Error: resource.NewBadRequestError("missing query key"),
}, nil
}
response := &resourcepb.ResourceSearchResponse{
Error: b.verifyKey(req.Options.Key),
}
if response.Error != nil {
return response, nil
}
// Verifies the index federation
index, err := b.getIndex(ctx, req, federate)
if err != nil {
return nil, err
}
// convert protobuf request to bleve request
searchrequest, e := b.toBleveSearchRequest(ctx, req, access)
if e != nil {
response.Error = e
return response, nil
}
// Show all fields when nothing is selected
if len(searchrequest.Fields) < 1 && req.Limit > 0 {
f, err := b.index.Fields()
if err != nil {
return nil, err
}
if len(f) > 0 {
searchrequest.Fields = f
} else {
searchrequest.Fields = []string{
resource.SEARCH_FIELD_TITLE,
resource.SEARCH_FIELD_FOLDER,
resource.SEARCH_FIELD_SOURCE_PATH,
resource.SEARCH_FIELD_MANAGED_BY,
}
}
}
res, err := index.SearchInContext(ctx, searchrequest)
if err != nil {
return nil, err
}
response.TotalHits = int64(res.Total)
response.QueryCost = float64(res.Cost)
response.MaxScore = res.MaxScore
response.Results, err = b.hitsToTable(ctx, searchrequest.Fields, res.Hits, req.Explain)
if err != nil {
return nil, err
}
// parse the facet fields
for k, v := range res.Facets {
f := newResponseFacet(v)
if response.Facet == nil {
response.Facet = make(map[string]*resourcepb.ResourceSearchResponse_Facet)
}
response.Facet[k] = f
}
return response, nil
}
func (b *bleveIndex) DocCount(ctx context.Context, folder string) (int64, error) {
ctx, span := b.tracing.Start(ctx, tracingPrexfixBleve+"DocCount")
defer span.End()
if folder == "" {
count, err := b.index.DocCount()
return int64(count), err
}
req := &bleve.SearchRequest{
Size: 0, // we just need the count
Fields: []string{},
Query: &query.TermQuery{
Term: folder,
FieldVal: resource.SEARCH_FIELD_FOLDER,
},
}
rsp, err := b.index.SearchInContext(ctx, req)
if rsp == nil {
return 0, err
}
return int64(rsp.Total), err
}
// make sure the request key matches the index
func (b *bleveIndex) verifyKey(key *resourcepb.ResourceKey) *resourcepb.ErrorResult {
if key.Namespace != b.key.Namespace {
return resource.NewBadRequestError("namespace mismatch (expected " + b.key.Namespace + ")")
}
if key.Group != b.key.Group {
return resource.NewBadRequestError("group mismatch (expected " + b.key.Group + ")")
}
if key.Resource != b.key.Resource {
return resource.NewBadRequestError("resource mismatch (expected " + b.key.Resource + ")")
}
return nil
}
func (b *bleveIndex) getIndex(
ctx context.Context,
req *resourcepb.ResourceSearchRequest,
federate []resource.ResourceIndex,
) (bleve.Index, error) {
_, span := b.tracing.Start(ctx, tracingPrexfixBleve+"getIndex")
defer span.End()
if len(req.Federated) != len(federate) {
return nil, fmt.Errorf("federation is misconfigured")
}
// Search across resources using
// https://blevesearch.com/docs/IndexAlias/
if len(federate) > 0 {
all := []bleve.Index{b.index}
for i, extra := range federate {
typedindex, ok := extra.(*bleveIndex)
if !ok {
return nil, fmt.Errorf("federated indexes must be the same type")
}
if typedindex.verifyKey(req.Federated[i]) != nil {
return nil, fmt.Errorf("federated index keys do not match (%v != %v)", typedindex, req.Federated[i])
}
all = append(all, typedindex.index)
}
return bleve.NewIndexAlias(all...), nil
}
return b.index, nil
}
func (b *bleveIndex) toBleveSearchRequest(ctx context.Context, req *resourcepb.ResourceSearchRequest, access authlib.AccessClient) (*bleve.SearchRequest, *resourcepb.ErrorResult) {
ctx, span := b.tracing.Start(ctx, tracingPrexfixBleve+"toBleveSearchRequest")
defer span.End()
facets := bleve.FacetsRequest{}
for _, f := range req.Facet {
facets[f.Field] = bleve.NewFacetRequest(f.Field, int(f.Limit))
}
// Convert resource-specific fields to bleve fields (just considers dashboard fields for now)
fields := make([]string, 0, len(req.Fields))
for _, f := range req.Fields {
if slices.Contains(DashboardFields(), f) {
f = resource.SEARCH_FIELD_PREFIX + f
}
fields = append(fields, f)
}
size, err := safeInt64ToInt(req.Limit)
if err != nil {
return nil, resource.AsErrorResult(err)
}
offset, err := safeInt64ToInt(req.Offset)
if err != nil {
return nil, resource.AsErrorResult(err)
}
searchrequest := &bleve.SearchRequest{
Fields: fields,
Size: size,
From: offset,
Explain: req.Explain,
Facets: facets,
}
// Currently everything is within an AND query
queries := []query.Query{}
if len(req.Options.Labels) > 0 {
for _, v := range req.Options.Labels {
q, err := requirementQuery(v, "labels.")
if err != nil {
return nil, err
}
queries = append(queries, q)
}
}
// filters
if len(req.Options.Fields) > 0 {
for _, v := range req.Options.Fields {
q, err := requirementQuery(v, "")
if err != nil {
return nil, err
}
queries = append(queries, q)
}
}
if len(req.Query) > 1 && strings.Contains(req.Query, "*") {
// wildcard query is expensive - should be used with caution
wildcard := bleve.NewWildcardQuery(req.Query)
queries = append(queries, wildcard)
}
if req.Query != "" && !strings.Contains(req.Query, "*") {
// Add a text query
searchrequest.Fields = append(searchrequest.Fields, resource.SEARCH_FIELD_SCORE)
// There are multiple ways to match the query string to documents. The following queries are ordered by priority:
// Query 1: Match the exact query string
queryExact := bleve.NewMatchQuery(req.Query)
queryExact.SetBoost(10.0)
queryExact.Analyzer = keyword.Name // don't analyze the query input - treat it as a single token
// Query 2: Phrase query with standard analyzer
queryPhrase := bleve.NewMatchPhraseQuery(req.Query)
queryExact.SetBoost(5.0)
queryPhrase.Analyzer = standard.Name
// Query 3: Match query with standard analyzer
queryAnalyzed := bleve.NewMatchQuery(req.Query)
queryAnalyzed.Analyzer = standard.Name
// At least one of the queries must match
searchQuery := bleve.NewDisjunctionQuery(queryExact, queryAnalyzed, queryPhrase)
queries = append(queries, searchQuery)
}
switch len(queries) {
case 0:
searchrequest.Query = bleve.NewMatchAllQuery()
case 1:
searchrequest.Query = queries[0]
default:
searchrequest.Query = bleve.NewConjunctionQuery(queries...) // AND
}
if access != nil && b.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchPermissionFiltering) {
auth, ok := authlib.AuthInfoFrom(ctx)
if !ok {
return nil, resource.AsErrorResult(fmt.Errorf("missing auth info"))
}
verb := utils.VerbList
if req.Permission == int64(dashboardaccess.PERMISSION_EDIT) {
verb = utils.VerbPatch
}
checker, err := access.Compile(ctx, auth, authlib.ListRequest{
Namespace: b.key.Namespace,
Group: b.key.Group,
Resource: b.key.Resource,
Verb: verb,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers := map[string]authlib.ItemChecker{
b.key.Resource: checker,
}
// handle federation
for _, federated := range req.Federated {
checker, err := access.Compile(ctx, auth, authlib.ListRequest{
Namespace: federated.Namespace,
Group: federated.Group,
Resource: federated.Resource,
Verb: utils.VerbList,
})
if err != nil {
return nil, resource.AsErrorResult(err)
}
checkers[federated.Resource] = checker
}
searchrequest.Query = newPermissionScopedQuery(searchrequest.Query, checkers)
}
for k, v := range req.Facet {
if searchrequest.Facets == nil {
searchrequest.Facets = make(bleve.FacetsRequest)
}
searchrequest.Facets[k] = bleve.NewFacetRequest(v.Field, int(v.Limit))
}
// Add the sort fields
sorting := getSortFields(req)
searchrequest.SortBy(sorting)
// When no sort fields are provided, sort by score if there is a query, otherwise sort by title
if len(sorting) == 0 {
if req.Query != "" && req.Query != "*" {
searchrequest.Sort = append(searchrequest.Sort, &search.SortScore{
Desc: true,
})
} else {
searchrequest.Sort = append(searchrequest.Sort, &search.SortField{
Field: resource.SEARCH_FIELD_TITLE_PHRASE,
Desc: false,
})
}
}
return searchrequest, nil
}
func safeInt64ToInt(i64 int64) (int, error) {
if i64 > math.MaxInt32 || i64 < math.MinInt32 {
return 0, fmt.Errorf("int64 value %d overflows int", i64)
}
return int(i64), nil
}
func getSortFields(req *resourcepb.ResourceSearchRequest) []string {
sorting := []string{}
for _, sort := range req.SortBy {
input := sort.Field
if field, ok := textSortFields[input]; ok {
input = field
}
if slices.Contains(DashboardFields(), input) {
input = resource.SEARCH_FIELD_PREFIX + input
}
if sort.Desc {
input = "-" + input
}
sorting = append(sorting, input)
}
return sorting
}
// fields that we went to sort by the full text
var textSortFields = map[string]string{
resource.SEARCH_FIELD_TITLE: resource.SEARCH_FIELD_TITLE_PHRASE,
}
const lowerCase = "phrase"
// termField fields to use termQuery for filtering
var termFields = []string{
resource.SEARCH_FIELD_TITLE,
}
// Convert a "requirement" into a bleve query
func requirementQuery(req *resourcepb.Requirement, prefix string) (query.Query, *resourcepb.ErrorResult) {
switch selection.Operator(req.Operator) {
case selection.Equals, selection.DoubleEquals:
if len(req.Values) == 0 {
return query.NewMatchAllQuery(), nil
}
if len(req.Values) == 1 {
filter := filterValue(req.Key, req.Values[0])
return newQuery(req.Key, filter, prefix), nil
}
conjuncts := []query.Query{}
for _, v := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, v), prefix)
conjuncts = append(conjuncts, q)
}
return query.NewConjunctionQuery(conjuncts), nil
case selection.NotEquals:
case selection.DoesNotExist:
case selection.GreaterThan:
case selection.LessThan:
case selection.Exists:
case selection.In:
if len(req.Values) == 0 {
return query.NewMatchAllQuery(), nil
}
if len(req.Values) == 1 {
q := newQuery(req.Key, filterValue(req.Key, req.Values[0]), prefix)
return q, nil
}
disjuncts := []query.Query{}
for _, v := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, v), prefix)
disjuncts = append(disjuncts, q)
}
return query.NewDisjunctionQuery(disjuncts), nil
case selection.NotIn:
boolQuery := bleve.NewBooleanQuery()
var mustNotQueries []query.Query
for _, value := range req.Values {
q := newQuery(req.Key, filterValue(req.Key, value), prefix)
mustNotQueries = append(mustNotQueries, q)
}
boolQuery.AddMustNot(mustNotQueries...)
// must still have a value
notEmptyQuery := bleve.NewMatchAllQuery()
boolQuery.AddMust(notEmptyQuery)
return boolQuery, nil
}
return nil, resource.NewBadRequestError(
fmt.Sprintf("unsupported query operation (%s %s %v)", req.Key, req.Operator, req.Values),
)
}
// newQuery will create a query that will match the value or the tokens of the value
func newQuery(key string, value string, prefix string) query.Query {
if value == "*" {
return bleve.NewMatchAllQuery()
}
if strings.Contains(value, "*") {
// wildcard query is expensive - should be used with caution
return bleve.NewWildcardQuery(value)
}
delimiter, ok := hasTerms(value)
if slices.Contains(termFields, key) && ok {
return newTermsQuery(key, value, delimiter, prefix)
}
q := bleve.NewMatchQuery(value)
q.SetField(prefix + key)
return q
}
// newTermsQuery will create a query that will match on term or tokens
func newTermsQuery(key string, value string, delimiter string, prefix string) query.Query {
tokens := strings.Split(value, delimiter)
// won't match with ending space
value = strings.TrimSuffix(value, " ")
q := bleve.NewTermQuery(value)
q.SetField(prefix + key)
cq := newMatchAllTokensQuery(tokens, key, prefix)
return bleve.NewDisjunctionQuery(q, cq)
}
// newMatchAllTokensQuery will create a query that will match on all tokens
func newMatchAllTokensQuery(tokens []string, key string, prefix string) query.Query {
cq := bleve.NewConjunctionQuery()
for _, token := range tokens {
_, ok := hasTerms(token)
if ok {
tq := bleve.NewTermQuery(token)
tq.SetField(prefix + key)
cq.AddQuery(tq)
continue
}
mq := bleve.NewMatchQuery(token)
mq.SetField(prefix + key)
cq.AddQuery(mq)
}
return cq
}
// filterValue will convert the value to lower case if the field is a phrase field
func filterValue(field string, v string) string {
if strings.HasSuffix(field, lowerCase) {
return strings.ToLower(v)
}
return v
}
func (b *bleveIndex) hitsToTable(ctx context.Context, selectFields []string, hits search.DocumentMatchCollection, explain bool) (*resourcepb.ResourceTable, error) {
_, span := b.tracing.Start(ctx, tracingPrexfixBleve+"hitsToTable")
defer span.End()
fields := []*resourcepb.ResourceTableColumnDefinition{}
for _, name := range selectFields {
if name == "_all" {
fields = b.allFields
break
}
f := b.standard.Field(name)
if f == nil && b.fields != nil {
f = b.fields.Field(name)
}
if f == nil {
// Labels as a string
if strings.HasPrefix(name, "labels.") {
f = &resourcepb.ResourceTableColumnDefinition{
Name: name,
Type: resourcepb.ResourceTableColumnDefinition_STRING,
}
}
// return nil, fmt.Errorf("unknown response field: " + name)
if f == nil {
continue // OK for now
}
}
fields = append(fields, f)
}
if explain {
fields = append(fields, b.standard.Field(resource.SEARCH_FIELD_EXPLAIN))
}
builder, err := resource.NewTableBuilder(fields)
if err != nil {
return nil, err
}
encoders := builder.Encoders()
table := &resourcepb.ResourceTable{
Columns: fields,
Rows: make([]*resourcepb.ResourceTableRow, hits.Len()),
}
for rowID, match := range hits {
row := &resourcepb.ResourceTableRow{
Key: &resourcepb.ResourceKey{},
Cells: make([][]byte, len(fields)),
}
table.Rows[rowID] = row
err := resource.ReadSearchID(row.Key, match.ID)
if err != nil {
return nil, err
}
for i, f := range fields {
var v any
switch f.Name {
case resource.SEARCH_FIELD_ID:
row.Cells[i] = []byte(match.ID)
case resource.SEARCH_FIELD_SCORE:
row.Cells[i], err = encoders[i](match.Score)
case resource.SEARCH_FIELD_EXPLAIN:
if match.Expl != nil {
row.Cells[i], err = json.Marshal(match.Expl)
}
case resource.SEARCH_FIELD_LEGACY_ID:
v := match.Fields[resource.SEARCH_FIELD_LABELS+"."+resource.SEARCH_FIELD_LEGACY_ID]
if v != nil {
str, ok := v.(string)
if ok {
id, _ := strconv.ParseInt(str, 10, 64)
row.Cells[i], err = encoders[i](id)
}
}
default:
fieldName := f.Name
// since the bleve index fields mix common and resource-specific fields, it is possible a conflict can happen
// if a specific field is named the same as a common field
v := match.Fields[fieldName]
// fields that are specific to the resource get stored as fields.<fieldName>, so we need to check for that
if v == nil {
v = match.Fields[resource.SEARCH_FIELD_PREFIX+fieldName]
}
if v != nil {
// Encode the value to protobuf
row.Cells[i], err = encoders[i](v)
}
}
if err != nil {
return nil, fmt.Errorf("error encoding (row:%d/col:%d) %v %w", rowID, i, v, err)
}
}
}
return table, nil
}
func getAllFields(standard resource.SearchableDocumentFields, custom resource.SearchableDocumentFields) ([]*resourcepb.ResourceTableColumnDefinition, error) {
fields := []*resourcepb.ResourceTableColumnDefinition{
standard.Field(resource.SEARCH_FIELD_ID),
standard.Field(resource.SEARCH_FIELD_TITLE),
standard.Field(resource.SEARCH_FIELD_TAGS),
standard.Field(resource.SEARCH_FIELD_FOLDER),
standard.Field(resource.SEARCH_FIELD_RV),
standard.Field(resource.SEARCH_FIELD_CREATED),
standard.Field(resource.SEARCH_FIELD_LEGACY_ID),
standard.Field(resource.SEARCH_FIELD_MANAGER_KIND),
}
if custom != nil {
for _, name := range custom.Fields() {
f := custom.Field(name)
if f.Priority > 10 {
continue
}
fields = append(fields, f)
}
}
for _, field := range fields {
if field == nil {
return nil, fmt.Errorf("invalid all field")
}
}
return fields, nil
}
func newResponseFacet(v *search.FacetResult) *resourcepb.ResourceSearchResponse_Facet {
f := &resourcepb.ResourceSearchResponse_Facet{
Field: v.Field,
Total: int64(v.Total),
Missing: int64(v.Missing),
}
if v.Terms != nil {
for _, t := range v.Terms.Terms() {
f.Terms = append(f.Terms, &resourcepb.ResourceSearchResponse_TermFacet{
Term: t.Term,
Count: int64(t.Count),
})
}
}
return f
}
type permissionScopedQuery struct {
query.Query
checkers map[string]authlib.ItemChecker // one checker per resource
log log.Logger
}
func newPermissionScopedQuery(q query.Query, checkers map[string]authlib.ItemChecker) *permissionScopedQuery {
return &permissionScopedQuery{
Query: q,
checkers: checkers,
log: log.New("search_permissions"),
}
}
func (q *permissionScopedQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) {
searcher, err := q.Query.Searcher(ctx, i, m, options)
if err != nil {
return nil, err
}
dvReader, err := i.DocValueReader([]string{"folder"})
if err != nil {
return nil, err
}
filteringSearcher := bleveSearch.NewFilteringSearcher(ctx, searcher, func(d *search.DocumentMatch) bool {
// The doc ID has the format: <namespace>/<group>/<resourceType>/<name>
// IndexInternalID will be the same as the doc ID when using an in-memory index, but when using a file-based
// index it becomes a binary encoded number that has some other internal meaning. Using ExternalID() will get the
// correct doc ID regardless of the index type.
d.ID, err = i.ExternalID(d.IndexInternalID)
if err != nil {
q.log.Debug("Error getting external ID", "error", err)
return false
}
parts := strings.Split(d.ID, "/")
// Exclude doc if id isn't expected format
if len(parts) != 4 {
q.log.Debug("Unexpected document ID format", "id", d.ID)
return false
}
ns := parts[0]
resource := parts[2]
name := parts[3]
folder := ""
err = dvReader.VisitDocValues(d.IndexInternalID, func(field string, value []byte) {
if field == "folder" {
folder = string(value)
}
})
if err != nil {
q.log.Debug("Error reading doc values", "error", err)
return false
}
if _, ok := q.checkers[resource]; !ok {
q.log.Debug("No resource checker found", "resource", resource)
return false
}
allowed := q.checkers[resource](name, folder)
if !allowed {
q.log.Debug("Denying access", "ns", ns, "name", name, "folder", folder)
}
return allowed
})
return filteringSearcher, nil
}
// hasTerms - any value that will be split into multiple tokens
var hasTerms = func(v string) (string, bool) {
for _, c := range TermCharacters {
if strings.Contains(v, c) {
return c, true
}
}
return "", false
}
// TermCharacters characters that will be used to determine if a value is split into tokens
var TermCharacters = []string{
" ", "-", "_", ".", ",", ":", ";", "?", "!", "@", "#", "$", "%", "^", "&", "*", "(", ")", "+",
"=", "{", "}", "[", "]", "|", "\\", "/", "<", ">", "~", "`",
"'", "\"",
}