mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 00:35:12 +08:00

* update storage-api to only build index if it owns the namespace --------- Co-authored-by: Mustafa Sencer Özcan <mustafasencer.ozcan@grafana.com>
1400 lines
40 KiB
Go
1400 lines
40 KiB
Go
package resource
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log/slog"
|
||
"net/http"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"go.opentelemetry.io/otel/trace"
|
||
"go.opentelemetry.io/otel/trace/noop"
|
||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||
|
||
claims "github.com/grafana/authlib/types"
|
||
"github.com/grafana/dskit/backoff"
|
||
"github.com/grafana/dskit/ring"
|
||
|
||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||
)
|
||
|
||
const (
|
||
// DefaultMaxBackoff is the default maximum backoff duration for enqueue operations.
|
||
DefaultMaxBackoff = 1 * time.Second
|
||
// DefaultMinBackoff is the default minimum backoff duration for enqueue operations.
|
||
DefaultMinBackoff = 100 * time.Millisecond
|
||
// DefaultMaxRetries is the default maximum number of retries for enqueue operations.
|
||
DefaultMaxRetries = 3
|
||
)
|
||
|
||
// ResourceServer implements all gRPC services
|
||
type ResourceServer interface {
|
||
resourcepb.ResourceStoreServer
|
||
resourcepb.BulkStoreServer
|
||
resourcepb.ResourceIndexServer
|
||
resourcepb.ManagedObjectIndexServer
|
||
resourcepb.BlobStoreServer
|
||
resourcepb.DiagnosticsServer
|
||
}
|
||
|
||
type ListIterator interface {
|
||
// Next advances iterator and returns true if there is next value is available from the iterator.
|
||
// Error() should be checked after every call of Next(), even when Next() returns true.
|
||
Next() bool // sql.Rows
|
||
|
||
// Error returns iterator error, if any. This should be checked after any Next() call.
|
||
// (Some iterator implementations return true from Next, but also set the error at the same time).
|
||
Error() error
|
||
|
||
// ContinueToken returns the token that can be used to start iterating *after* this item
|
||
ContinueToken() string
|
||
|
||
// ResourceVersion of the current item
|
||
ResourceVersion() int64
|
||
|
||
// Namespace of the current item
|
||
// Used for fast(er) authz filtering
|
||
Namespace() string
|
||
|
||
// Name of the current item
|
||
// Used for fast(er) authz filtering
|
||
Name() string
|
||
|
||
// Folder of the current item
|
||
// Used for fast(er) authz filtering
|
||
Folder() string
|
||
|
||
// Value for the current item
|
||
Value() []byte
|
||
}
|
||
|
||
type BackendReadResponse struct {
|
||
// Metadata
|
||
Key *resourcepb.ResourceKey
|
||
Folder string
|
||
|
||
// GUID that is used internally
|
||
GUID string
|
||
// The new resource version
|
||
ResourceVersion int64
|
||
// The properties
|
||
Value []byte
|
||
// Error details
|
||
Error *resourcepb.ErrorResult
|
||
}
|
||
|
||
// The StorageBackend is an internal abstraction that supports interacting with
|
||
// the underlying raw storage medium. This interface is never exposed directly,
|
||
// it is provided by concrete instances that actually write values.
|
||
type StorageBackend interface {
|
||
// Write a Create/Update/Delete,
|
||
// NOTE: the contents of WriteEvent have been validated
|
||
// Return the revisionVersion for this event or error
|
||
WriteEvent(context.Context, WriteEvent) (int64, error)
|
||
|
||
// Read a resource from storage optionally at an explicit version
|
||
ReadResource(context.Context, *resourcepb.ReadRequest) *BackendReadResponse
|
||
|
||
// When the ResourceServer executes a List request, this iterator will
|
||
// query the backend for potential results. All results will be
|
||
// checked against the kubernetes requirements before finally returning
|
||
// results. The list options can be used to improve performance
|
||
// but are the the final answer.
|
||
ListIterator(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
|
||
|
||
// ListHistory is like ListIterator, but it returns the history of a resource
|
||
ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
|
||
|
||
// Get all events from the store
|
||
// For HA setups, this will be more events than the local WriteEvent above!
|
||
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
|
||
|
||
// Get resource stats within the storage backend. When namespace is empty, it will apply to all
|
||
GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error)
|
||
}
|
||
|
||
type ResourceStats struct {
|
||
NamespacedResource
|
||
|
||
Count int64
|
||
ResourceVersion int64
|
||
}
|
||
|
||
// This interface is not exposed to end users directly
|
||
// Access to this interface is already gated by access control
|
||
type BlobSupport interface {
|
||
// Indicates if storage layer supports signed urls
|
||
SupportsSignedURLs() bool
|
||
|
||
// Get the raw blob bytes and metadata -- limited to protobuf message size
|
||
// For larger payloads, we should use presigned URLs to upload from the client
|
||
PutResourceBlob(context.Context, *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error)
|
||
|
||
// Get blob contents. When possible, this will return a signed URL
|
||
// For large payloads, signed URLs are required to avoid protobuf message size limits
|
||
GetResourceBlob(ctx context.Context, resource *resourcepb.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resourcepb.GetBlobResponse, error)
|
||
|
||
// TODO? List+Delete? This is for admin access
|
||
}
|
||
|
||
type QOSEnqueuer interface {
|
||
Enqueue(ctx context.Context, tenantID string, runnable func()) error
|
||
}
|
||
|
||
type BlobConfig struct {
|
||
// The CDK configuration URL
|
||
URL string
|
||
|
||
// Directly implemented blob support
|
||
Backend BlobSupport
|
||
}
|
||
|
||
// Passed as input to the constructor
|
||
type SearchOptions struct {
|
||
// The raw index backend (eg, bleve, frames, parquet, etc)
|
||
Backend SearchBackend
|
||
|
||
// The supported resource types
|
||
Resources DocumentBuilderSupplier
|
||
|
||
// How many threads should build indexes
|
||
WorkerThreads int
|
||
|
||
// Skip building index on startup for small indexes
|
||
InitMinCount int
|
||
|
||
// Build empty index on startup for large indexes so that
|
||
// we don't re-attempt to build the index later.
|
||
InitMaxCount int
|
||
|
||
// Channel to watch for index events (for testing)
|
||
IndexEventsChan chan *IndexEvent
|
||
|
||
// Interval for periodic index rebuilds (0 disables periodic rebuilds)
|
||
RebuildInterval time.Duration
|
||
|
||
Ring *ring.Ring
|
||
}
|
||
|
||
type ResourceServerOptions struct {
|
||
// OTel tracer
|
||
Tracer trace.Tracer
|
||
|
||
// Real storage backend
|
||
Backend StorageBackend
|
||
|
||
// The blob configuration
|
||
Blob BlobConfig
|
||
|
||
// Search options
|
||
Search SearchOptions
|
||
|
||
// Diagnostics
|
||
Diagnostics resourcepb.DiagnosticsServer
|
||
|
||
// Check if a user has access to write folders
|
||
// When this is nil, no resources can have folders configured
|
||
WriteHooks WriteAccessHooks
|
||
|
||
// Link RBAC
|
||
AccessClient claims.AccessClient
|
||
|
||
// Callbacks for startup and shutdown
|
||
Lifecycle LifecycleHooks
|
||
|
||
// Get the current time in unix millis
|
||
Now func() int64
|
||
|
||
// Registerer to register prometheus Metrics for the Resource server
|
||
Reg prometheus.Registerer
|
||
|
||
storageMetrics *StorageMetrics
|
||
|
||
IndexMetrics *BleveIndexMetrics
|
||
|
||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||
MaxPageSizeBytes int
|
||
|
||
// QOSQueue is the quality of service queue used to enqueue
|
||
QOSQueue QOSEnqueuer
|
||
|
||
Ring *ring.Ring
|
||
RingLifecycler *ring.BasicLifecycler
|
||
}
|
||
|
||
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
|
||
if opts.Tracer == nil {
|
||
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
|
||
}
|
||
|
||
if opts.Backend == nil {
|
||
return nil, fmt.Errorf("missing Backend implementation")
|
||
}
|
||
|
||
if opts.AccessClient == nil {
|
||
opts.AccessClient = claims.FixedAccessClient(true) // everything OK
|
||
}
|
||
|
||
if opts.Diagnostics == nil {
|
||
opts.Diagnostics = &noopService{}
|
||
}
|
||
|
||
if opts.Now == nil {
|
||
opts.Now = func() int64 {
|
||
return time.Now().UnixMilli()
|
||
}
|
||
}
|
||
|
||
if opts.MaxPageSizeBytes <= 0 {
|
||
// By default, we use 2MB for the page size.
|
||
opts.MaxPageSizeBytes = 1024 * 1024 * 2
|
||
}
|
||
|
||
if opts.QOSQueue == nil {
|
||
opts.QOSQueue = scheduler.NewNoopQueue()
|
||
}
|
||
|
||
// Initialize the blob storage
|
||
blobstore := opts.Blob.Backend
|
||
if blobstore == nil {
|
||
if opts.Blob.URL != "" {
|
||
ctx := context.Background()
|
||
bucket, err := OpenBlobBucket(ctx, opts.Blob.URL)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{
|
||
Tracer: opts.Tracer,
|
||
Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer),
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// Check if the backend supports blob storage
|
||
blobstore, _ = opts.Backend.(BlobSupport)
|
||
}
|
||
}
|
||
|
||
logger := slog.Default().With("logger", "resource-server")
|
||
|
||
// Make this cancelable
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
s := &server{
|
||
tracer: opts.Tracer,
|
||
log: logger,
|
||
backend: opts.Backend,
|
||
blob: blobstore,
|
||
diagnostics: opts.Diagnostics,
|
||
access: opts.AccessClient,
|
||
writeHooks: opts.WriteHooks,
|
||
lifecycle: opts.Lifecycle,
|
||
now: opts.Now,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
storageMetrics: opts.storageMetrics,
|
||
indexMetrics: opts.IndexMetrics,
|
||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||
reg: opts.Reg,
|
||
queue: opts.QOSQueue,
|
||
}
|
||
|
||
if opts.Search.Resources != nil {
|
||
var err error
|
||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.Ring, opts.RingLifecycler)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
err := s.Init(ctx)
|
||
if err != nil {
|
||
s.log.Error("resource server init failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
return s, nil
|
||
}
|
||
|
||
var _ ResourceServer = &server{}
|
||
|
||
type server struct {
|
||
tracer trace.Tracer
|
||
log *slog.Logger
|
||
backend StorageBackend
|
||
blob BlobSupport
|
||
search *searchSupport
|
||
diagnostics resourcepb.DiagnosticsServer
|
||
access claims.AccessClient
|
||
writeHooks WriteAccessHooks
|
||
lifecycle LifecycleHooks
|
||
now func() int64
|
||
mostRecentRV atomic.Int64 // The most recent resource version seen by the server
|
||
storageMetrics *StorageMetrics
|
||
indexMetrics *BleveIndexMetrics
|
||
|
||
// Background watch task -- this has permissions for everything
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
broadcaster Broadcaster[*WrittenEvent]
|
||
|
||
// init checking
|
||
once sync.Once
|
||
initErr error
|
||
|
||
maxPageSizeBytes int
|
||
reg prometheus.Registerer
|
||
queue QOSEnqueuer
|
||
}
|
||
|
||
// Init implements ResourceServer.
|
||
func (s *server) Init(ctx context.Context) error {
|
||
s.once.Do(func() {
|
||
// Call lifecycle hooks
|
||
if s.lifecycle != nil {
|
||
err := s.lifecycle.Init(ctx)
|
||
if err != nil {
|
||
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
|
||
}
|
||
}
|
||
|
||
// initialize the search index
|
||
if s.initErr == nil && s.search != nil {
|
||
s.initErr = s.search.init(ctx)
|
||
}
|
||
|
||
// Start watching for changes
|
||
if s.initErr == nil {
|
||
s.initErr = s.initWatcher()
|
||
}
|
||
|
||
if s.initErr != nil {
|
||
s.log.Error("error running resource server init", "error", s.initErr)
|
||
}
|
||
})
|
||
return s.initErr
|
||
}
|
||
|
||
func (s *server) Stop(ctx context.Context) error {
|
||
s.initErr = fmt.Errorf("service is stopping")
|
||
|
||
var stopFailed bool
|
||
if s.lifecycle != nil {
|
||
err := s.lifecycle.Stop(ctx)
|
||
if err != nil {
|
||
stopFailed = true
|
||
s.initErr = fmt.Errorf("service stopeed with error: %w", err)
|
||
}
|
||
}
|
||
|
||
// Stops the streaming
|
||
s.cancel()
|
||
|
||
// mark the value as done
|
||
if stopFailed {
|
||
return s.initErr
|
||
}
|
||
s.initErr = fmt.Errorf("service is stopped")
|
||
|
||
return nil
|
||
}
|
||
|
||
// Old value indicates an update -- otherwise a create
|
||
//
|
||
//nolint:gocyclo
|
||
func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, value, oldValue []byte) (*WriteEvent, *resourcepb.ErrorResult) {
|
||
tmp := &unstructured.Unstructured{}
|
||
err := tmp.UnmarshalJSON(value)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
obj, err := utils.MetaAccessor(tmp)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
|
||
if obj.GetUID() == "" {
|
||
// TODO! once https://github.com/grafana/grafana/pull/96086 is deployed everywhere
|
||
// return nil, NewBadRequestError("object is missing UID")
|
||
s.log.Error("object is missing UID", "key", key)
|
||
}
|
||
|
||
if obj.GetResourceVersion() != "" {
|
||
s.log.Error("object must not include a resource version", "key", key)
|
||
}
|
||
|
||
// Make sure the command labels are not saved
|
||
for k := range obj.GetLabels() {
|
||
if k == utils.LabelKeyGetHistory || k == utils.LabelKeyGetTrash || k == utils.LabelGetFullpath {
|
||
return nil, NewBadRequestError("can not save label: " + k)
|
||
}
|
||
}
|
||
|
||
if obj.GetAnnotation(utils.AnnoKeyGrantPermissions) != "" {
|
||
return nil, NewBadRequestError("can not save annotation: " + utils.AnnoKeyGrantPermissions)
|
||
}
|
||
|
||
// Verify that this resource can reference secure values
|
||
secure, err := obj.GetSecureValues()
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
if len(secure) > 0 {
|
||
// See: https://github.com/grafana/grafana/pull/107803
|
||
return nil, NewBadRequestError("Saving secure values is not yet supported")
|
||
}
|
||
|
||
event := &WriteEvent{
|
||
Value: value,
|
||
Key: key,
|
||
Object: obj,
|
||
GUID: uuid.New().String(),
|
||
}
|
||
|
||
if oldValue == nil {
|
||
event.Type = resourcepb.WatchEvent_ADDED
|
||
} else {
|
||
event.Type = resourcepb.WatchEvent_MODIFIED
|
||
|
||
temp := &unstructured.Unstructured{}
|
||
err = temp.UnmarshalJSON(oldValue)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
event.ObjectOld, err = utils.MetaAccessor(temp)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
}
|
||
|
||
if key.Namespace != obj.GetNamespace() {
|
||
return nil, NewBadRequestError("key/namespace do not match")
|
||
}
|
||
|
||
gvk := obj.GetGroupVersionKind()
|
||
if gvk.Kind == "" {
|
||
return nil, NewBadRequestError("expecting resources with a kind in the body")
|
||
}
|
||
if gvk.Version == "" {
|
||
return nil, NewBadRequestError("expecting resources with an apiVersion")
|
||
}
|
||
if gvk.Group != "" && gvk.Group != key.Group {
|
||
return nil, NewBadRequestError(
|
||
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
|
||
)
|
||
}
|
||
|
||
// This needs to be a create function
|
||
if key.Name == "" {
|
||
if obj.GetName() == "" {
|
||
return nil, NewBadRequestError("missing name")
|
||
}
|
||
key.Name = obj.GetName()
|
||
} else if key.Name != obj.GetName() {
|
||
return nil, NewBadRequestError(
|
||
fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName()))
|
||
}
|
||
if err := validateName(obj.GetName()); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// For folder moves, we need to check permissions on both folders
|
||
if s.isFolderMove(event) {
|
||
if err := s.checkFolderMovePermissions(ctx, user, key, event.ObjectOld.GetFolder(), obj.GetFolder()); err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// Regular permission check for create/update
|
||
check := claims.CheckRequest{
|
||
Verb: utils.VerbCreate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
}
|
||
|
||
if event.Type == resourcepb.WatchEvent_MODIFIED {
|
||
check.Verb = utils.VerbUpdate
|
||
check.Name = key.Name
|
||
}
|
||
|
||
check.Folder = obj.GetFolder()
|
||
a, err := s.access.Check(ctx, user, check)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return nil, &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}
|
||
}
|
||
}
|
||
|
||
m, ok := obj.GetManagerProperties()
|
||
if ok && m.Kind == utils.ManagerKindRepo {
|
||
err = s.writeHooks.CanWriteValueFromRepository(ctx, user, m.Identity)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
}
|
||
return event, nil
|
||
}
|
||
|
||
// isFolderMove determines if an event represents a resource being moved between folders
|
||
func (s *server) isFolderMove(event *WriteEvent) bool {
|
||
return event.Type == resourcepb.WatchEvent_MODIFIED &&
|
||
event.ObjectOld != nil &&
|
||
event.ObjectOld.GetFolder() != event.Object.GetFolder()
|
||
}
|
||
|
||
// checkFolderMovePermissions handles permission checks when a resource is being moved between folders
|
||
func (s *server) checkFolderMovePermissions(ctx context.Context, user claims.AuthInfo, key *resourcepb.ResourceKey, oldFolder, newFolder string) *resourcepb.ErrorResult {
|
||
// First check if user can update the resource in the original folder
|
||
updateCheck := claims.CheckRequest{
|
||
Verb: utils.VerbUpdate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Name: key.Name,
|
||
Folder: oldFolder,
|
||
}
|
||
|
||
a, err := s.access.Check(ctx, user, updateCheck)
|
||
if err != nil {
|
||
return AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
Message: "not allowed to update resource in the source folder",
|
||
}
|
||
}
|
||
|
||
// Then check if user can create the resource in the destination folder
|
||
createCheck := claims.CheckRequest{
|
||
Verb: utils.VerbCreate,
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Folder: newFolder,
|
||
}
|
||
|
||
a, err = s.access.Check(ctx, user, createCheck)
|
||
if err != nil {
|
||
return AsErrorResult(err)
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
Message: "not allowed to create resource in the destination folder",
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.CreateResponse{}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.CreateResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
|
||
res, err = s.create(ctx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse {
|
||
return &resourcepb.CreateResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) create(ctx context.Context, user claims.AuthInfo, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
|
||
rsp := &resourcepb.CreateResponse{}
|
||
|
||
event, e := s.newEvent(ctx, user, req.Key, req.Value, nil)
|
||
if e != nil {
|
||
rsp.Error = e
|
||
return rsp, nil
|
||
}
|
||
|
||
// If the resource already exists, the create will return an already exists error that is remapped appropriately by AsErrorResult.
|
||
// This also benefits from ACID behaviours on our databases, so we avoid race conditions.
|
||
var err error
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource)
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.UpdateResponse{}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
if req.ResourceVersion < 0 {
|
||
rsp.Error = AsErrorResult(apierrors.NewBadRequest("update must include the previous version"))
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.UpdateResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
|
||
res, err = s.update(ctx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse {
|
||
return &resourcepb.UpdateResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) update(ctx context.Context, user claims.AuthInfo, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
|
||
rsp := &resourcepb.UpdateResponse{}
|
||
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: req.Key,
|
||
})
|
||
if latest.Error != nil {
|
||
return rsp, nil
|
||
}
|
||
if latest.Value == nil {
|
||
rsp.Error = NewBadRequestError("current value does not exist")
|
||
return rsp, nil
|
||
}
|
||
|
||
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
|
||
return nil, ErrOptimisticLockingFailed
|
||
}
|
||
|
||
event, e := s.newEvent(ctx, user, req.Key, req.Value, latest.Value)
|
||
if e != nil {
|
||
rsp.Error = e
|
||
return rsp, nil
|
||
}
|
||
|
||
event.Type = resourcepb.WatchEvent_MODIFIED
|
||
event.PreviousRV = latest.ResourceVersion
|
||
|
||
var err error
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
|
||
defer span.End()
|
||
|
||
rsp := &resourcepb.DeleteResponse{}
|
||
if req.ResourceVersion < 0 {
|
||
return nil, apierrors.NewBadRequest("update must include the previous version")
|
||
}
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.DeleteResponse
|
||
err error
|
||
)
|
||
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
|
||
res, err = s.delete(ctx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse {
|
||
return &resourcepb.DeleteResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
|
||
rsp := &resourcepb.DeleteResponse{}
|
||
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: req.Key,
|
||
})
|
||
if latest.Error != nil {
|
||
rsp.Error = latest.Error
|
||
return rsp, nil
|
||
}
|
||
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
|
||
rsp.Error = AsErrorResult(ErrOptimisticLockingFailed)
|
||
return rsp, nil
|
||
}
|
||
|
||
access, err := s.access.Check(ctx, user, claims.CheckRequest{
|
||
Verb: "delete",
|
||
Group: req.Key.Group,
|
||
Resource: req.Key.Resource,
|
||
Namespace: req.Key.Namespace,
|
||
Name: req.Key.Name,
|
||
Folder: latest.Folder,
|
||
})
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
return rsp, nil
|
||
}
|
||
if !access.Allowed {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
now := metav1.NewTime(time.UnixMilli(s.now()))
|
||
event := WriteEvent{
|
||
Key: req.Key,
|
||
Type: resourcepb.WatchEvent_DELETED,
|
||
PreviousRV: latest.ResourceVersion,
|
||
GUID: uuid.New().String(),
|
||
}
|
||
marker := &unstructured.Unstructured{}
|
||
err = json.Unmarshal(latest.Value, marker)
|
||
if err != nil {
|
||
return nil, apierrors.NewBadRequest(
|
||
fmt.Sprintf("unable to read previous object, %v", err))
|
||
}
|
||
oldObj, err := utils.MetaAccessor(marker)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
obj, err := utils.MetaAccessor(marker)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
obj.SetDeletionTimestamp(&now)
|
||
obj.SetUpdatedTimestamp(&now.Time)
|
||
obj.SetManagedFields(nil)
|
||
obj.SetFinalizers(nil)
|
||
obj.SetUpdatedBy(user.GetUID())
|
||
obj.SetGeneration(utils.DeletedGeneration)
|
||
obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it
|
||
event.ObjectOld = oldObj
|
||
event.Object = obj
|
||
event.Value, err = marker.MarshalJSON()
|
||
if err != nil {
|
||
return nil, apierrors.NewBadRequest(
|
||
fmt.Sprintf("unable creating deletion marker, %v", err))
|
||
}
|
||
|
||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return &resourcepb.ReadResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}}, nil
|
||
}
|
||
|
||
// if req.Key.Group == "" {
|
||
// status, _ := AsErrorResult(apierrors.NewBadRequest("missing group"))
|
||
// return &ReadResponse{Status: status}, nil
|
||
// }
|
||
if req.Key.Resource == "" {
|
||
return &resourcepb.ReadResponse{Error: NewBadRequestError("missing resource")}, nil
|
||
}
|
||
|
||
var (
|
||
res *resourcepb.ReadResponse
|
||
err error
|
||
)
|
||
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
|
||
res, err = s.read(ctx, user, req)
|
||
})
|
||
if runErr != nil {
|
||
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse {
|
||
return &resourcepb.ReadResponse{Error: e}
|
||
})
|
||
}
|
||
|
||
return res, err
|
||
}
|
||
|
||
func (s *server) read(ctx context.Context, user claims.AuthInfo, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
|
||
rsp := s.backend.ReadResource(ctx, req)
|
||
if rsp.Error != nil && rsp.Error.Code == http.StatusNotFound {
|
||
return &resourcepb.ReadResponse{Error: rsp.Error}, nil
|
||
}
|
||
|
||
a, err := s.access.Check(ctx, user, claims.CheckRequest{
|
||
Verb: "get",
|
||
Group: req.Key.Group,
|
||
Resource: req.Key.Resource,
|
||
Namespace: req.Key.Namespace,
|
||
Name: req.Key.Name,
|
||
Folder: rsp.Folder,
|
||
})
|
||
if err != nil {
|
||
return &resourcepb.ReadResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
if !a.Allowed {
|
||
return &resourcepb.ReadResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}}, nil
|
||
}
|
||
return &resourcepb.ReadResponse{
|
||
ResourceVersion: rsp.ResourceVersion,
|
||
Value: rsp.Value,
|
||
Error: rsp.Error,
|
||
}, nil
|
||
}
|
||
|
||
func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resourcepb.ListResponse, error) {
|
||
ctx, span := s.tracer.Start(ctx, "storage_server.List")
|
||
defer span.End()
|
||
|
||
// The history + trash queries do not yet support additional filters
|
||
if req.Source != resourcepb.ListRequest_STORE {
|
||
if len(req.Options.Fields) > 0 || len(req.Options.Labels) > 0 {
|
||
return &resourcepb.ListResponse{
|
||
Error: NewBadRequestError("unexpected field/label selector for history query"),
|
||
}, nil
|
||
}
|
||
}
|
||
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return &resourcepb.ListResponse{
|
||
Error: &resourcepb.ErrorResult{
|
||
Message: "no user found in context",
|
||
Code: http.StatusUnauthorized,
|
||
}}, nil
|
||
}
|
||
|
||
// Do not allow label query for trash/history
|
||
for _, v := range req.Options.Labels {
|
||
if v.Key == utils.LabelKeyGetHistory || v.Key == utils.LabelKeyGetTrash {
|
||
return &resourcepb.ListResponse{Error: NewBadRequestError("history and trash must be requested as source")}, nil
|
||
}
|
||
}
|
||
|
||
if req.Limit < 1 {
|
||
req.Limit = 50 // default max 50 items in a page
|
||
}
|
||
maxPageBytes := s.maxPageSizeBytes
|
||
pageBytes := 0
|
||
rsp := &resourcepb.ListResponse{}
|
||
|
||
key := req.Options.Key
|
||
checker, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbGet,
|
||
})
|
||
var trashChecker claims.ItemChecker // only for trash
|
||
if req.Source == resourcepb.ListRequest_TRASH {
|
||
trashChecker, err = s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbSetPermissions, // Basically Admin
|
||
})
|
||
if err != nil {
|
||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
}
|
||
if err != nil {
|
||
return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil
|
||
}
|
||
if checker == nil {
|
||
return &resourcepb.ListResponse{Error: &resourcepb.ErrorResult{
|
||
Code: http.StatusForbidden,
|
||
}}, nil
|
||
}
|
||
|
||
iterFunc := func(iter ListIterator) error {
|
||
for iter.Next() {
|
||
if err := iter.Error(); err != nil {
|
||
return err
|
||
}
|
||
|
||
item := &resourcepb.ResourceWrapper{
|
||
ResourceVersion: iter.ResourceVersion(),
|
||
Value: iter.Value(),
|
||
}
|
||
// Trash is only accessible to admins or the user who deleted the object
|
||
if req.Source == resourcepb.ListRequest_TRASH {
|
||
if !s.isTrashItemAuthorized(ctx, iter, trashChecker) {
|
||
continue
|
||
}
|
||
} else if !checker(iter.Name(), iter.Folder()) {
|
||
continue
|
||
}
|
||
|
||
pageBytes += len(item.Value)
|
||
rsp.Items = append(rsp.Items, item)
|
||
if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
|
||
t := iter.ContinueToken()
|
||
if iter.Next() {
|
||
rsp.NextPageToken = t
|
||
}
|
||
return iter.Error()
|
||
}
|
||
}
|
||
return iter.Error()
|
||
}
|
||
|
||
var rv int64
|
||
switch req.Source {
|
||
case resourcepb.ListRequest_STORE:
|
||
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
|
||
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
|
||
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
|
||
default:
|
||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
|
||
}
|
||
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
return rsp, nil
|
||
}
|
||
|
||
if rv < 1 {
|
||
rsp.Error = &resourcepb.ErrorResult{
|
||
Code: http.StatusInternalServerError,
|
||
Message: fmt.Sprintf("invalid resource version for list: %v", rv),
|
||
}
|
||
return rsp, nil
|
||
}
|
||
rsp.ResourceVersion = rv
|
||
return rsp, err
|
||
}
|
||
|
||
// isTrashItemAuthorized checks if the user has access to the trash item.
|
||
func (s *server) isTrashItemAuthorized(ctx context.Context, iter ListIterator, trashChecker claims.ItemChecker) bool {
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return false
|
||
}
|
||
|
||
partial := &metav1.PartialObjectMetadata{}
|
||
err := json.Unmarshal(iter.Value(), partial)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
|
||
obj, err := utils.MetaAccessor(partial)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
|
||
// Trash is only accessible to admins or the user who deleted the object
|
||
return obj.GetUpdatedBy() == user.GetUID() || trashChecker(iter.Name(), iter.Folder())
|
||
}
|
||
|
||
func (s *server) initWatcher() error {
|
||
var err error
|
||
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
|
||
events, err := s.backend.WatchWriteEvents(s.ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
go func() {
|
||
for v := range events {
|
||
if v == nil {
|
||
s.log.Error("received nil event")
|
||
continue
|
||
}
|
||
// Skip events during batch updates
|
||
if v.PreviousRV < 0 {
|
||
continue
|
||
}
|
||
|
||
s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name)
|
||
s.mostRecentRV.Store(v.ResourceVersion)
|
||
out <- v
|
||
}
|
||
}()
|
||
return nil
|
||
})
|
||
return err
|
||
}
|
||
|
||
//nolint:gocyclo
|
||
func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStore_WatchServer) error {
|
||
ctx := srv.Context()
|
||
|
||
user, ok := claims.AuthInfoFrom(ctx)
|
||
if !ok || user == nil {
|
||
return apierrors.NewUnauthorized("no user found in context")
|
||
}
|
||
|
||
key := req.Options.Key
|
||
checker, err := s.access.Compile(ctx, user, claims.ListRequest{
|
||
Group: key.Group,
|
||
Resource: key.Resource,
|
||
Namespace: key.Namespace,
|
||
Verb: utils.VerbGet,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if checker == nil {
|
||
return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
|
||
}
|
||
|
||
// Start listening -- this will buffer any changes that happen while we backfill.
|
||
// If events are generated faster than we can process them, then some events will be dropped.
|
||
// TODO: Think of a way to allow the client to catch up.
|
||
stream, err := s.broadcaster.Subscribe(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer s.broadcaster.Unsubscribe(stream)
|
||
|
||
// Determine a safe starting resource-version for the watch.
|
||
// When the client requests SendInitialEvents we will use the resource-version
|
||
// of the last object returned from the initial list (handled below).
|
||
// When the client supplies an explicit `since` we honour that.
|
||
// In the remaining case (SendInitialEvents == false && since == 0) we need
|
||
// a high-water-mark representing the current state of storage so that we
|
||
// donʼt replay events that happened before the watch was established. Using
|
||
// `mostRecentRV` – which is updated asynchronously by the broadcaster – is
|
||
// subject to races because the broadcaster may not yet have observed the
|
||
// latest committed writes. Instead we ask the backend directly for the
|
||
// current resource-version.
|
||
var mostRecentRV int64
|
||
if !req.SendInitialEvents && req.Since == 0 {
|
||
// We only need the current RV. A cheap way to obtain it is to issue a
|
||
// List with a very small limit and read the listRV returned by the
|
||
// iterator. The callback is a no-op so we avoid materialising any
|
||
// items.
|
||
listReq := &resourcepb.ListRequest{
|
||
Options: req.Options,
|
||
// This has right now no effect, as the list request only uses the limit if it lists from history or trash.
|
||
// It might be worth adding it in a subsequent PR. We only list once during setup of the watch, so it's
|
||
// fine for now.
|
||
Limit: 1,
|
||
}
|
||
|
||
rv, err := s.backend.ListIterator(ctx, listReq, func(ListIterator) error { return nil })
|
||
if err != nil {
|
||
// Fallback to the broadcasterʼs view if the backend lookup fails.
|
||
// This preserves previous behaviour while still eliminating the
|
||
// common race in the majority of cases.
|
||
s.log.Warn("watch: failed to fetch current RV from backend, falling back to broadcaster", "err", err)
|
||
mostRecentRV = s.mostRecentRV.Load()
|
||
} else {
|
||
mostRecentRV = rv
|
||
}
|
||
} else {
|
||
// For all other code-paths we either already have an explicit RV or we
|
||
// will derive it from the initial list below.
|
||
mostRecentRV = s.mostRecentRV.Load()
|
||
}
|
||
|
||
var initialEventsRV int64 // resource version coming from the initial events
|
||
if req.SendInitialEvents {
|
||
// Backfill the stream by adding every existing entities.
|
||
initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {
|
||
for iter.Next() {
|
||
if err := iter.Error(); err != nil {
|
||
return err
|
||
}
|
||
if err := srv.Send(&resourcepb.WatchEvent{
|
||
Type: resourcepb.WatchEvent_ADDED,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Value: iter.Value(),
|
||
Version: iter.ResourceVersion(),
|
||
},
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return iter.Error()
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if req.SendInitialEvents && req.AllowWatchBookmarks {
|
||
if err := srv.Send(&resourcepb.WatchEvent{
|
||
Type: resourcepb.WatchEvent_BOOKMARK,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Version: initialEventsRV,
|
||
},
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
var since int64 // resource version to start watching from
|
||
switch {
|
||
case req.SendInitialEvents:
|
||
since = initialEventsRV
|
||
case req.Since == 0:
|
||
since = mostRecentRV
|
||
default:
|
||
since = req.Since
|
||
}
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil
|
||
|
||
case event, ok := <-stream:
|
||
if !ok {
|
||
s.log.Debug("watch events closed")
|
||
return nil
|
||
}
|
||
s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name)
|
||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
|
||
if !checker(event.Key.Name, event.Folder) {
|
||
continue
|
||
}
|
||
|
||
value := event.Value
|
||
// remove the delete marker stored in the value for deleted objects
|
||
if event.Type == resourcepb.WatchEvent_DELETED {
|
||
value = []byte{}
|
||
}
|
||
resp := &resourcepb.WatchEvent{
|
||
Timestamp: event.Timestamp,
|
||
Type: event.Type,
|
||
Resource: &resourcepb.WatchEvent_Resource{
|
||
Value: value,
|
||
Version: event.ResourceVersion,
|
||
},
|
||
}
|
||
if event.PreviousRV > 0 {
|
||
prevObj, err := s.Read(ctx, &resourcepb.ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV})
|
||
if err != nil {
|
||
// This scenario should never happen, but if it does, we should log it and continue
|
||
// sending the event without the previous object. The client will decide what to do.
|
||
s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error)
|
||
} else {
|
||
if prevObj.ResourceVersion != event.PreviousRV {
|
||
s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion)
|
||
return fmt.Errorf("resource version mismatch")
|
||
}
|
||
resp.Previous = &resourcepb.WatchEvent_Resource{
|
||
Value: prevObj.Value,
|
||
Version: prevObj.ResourceVersion,
|
||
}
|
||
}
|
||
}
|
||
if err := srv.Send(resp); err != nil {
|
||
return err
|
||
}
|
||
|
||
if s.storageMetrics != nil {
|
||
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
|
||
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
|
||
if latencySeconds > 0 {
|
||
s.storageMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||
if s.search == nil {
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
|
||
return s.search.Search(ctx, req)
|
||
}
|
||
|
||
// GetStats implements ResourceServer.
|
||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||
if err := s.Init(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if s.search == nil {
|
||
// If the backend implements "GetStats", we can use it
|
||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||
if ok {
|
||
return srv.GetStats(ctx, req)
|
||
}
|
||
return nil, fmt.Errorf("search index not configured")
|
||
}
|
||
return s.search.GetStats(ctx, req)
|
||
}
|
||
|
||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||
return s.search.ListManagedObjects(ctx, req)
|
||
}
|
||
|
||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||
return s.search.CountManagedObjects(ctx, req)
|
||
}
|
||
|
||
// IsHealthy implements ResourceServer.
|
||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||
return s.diagnostics.IsHealthy(ctx, req)
|
||
}
|
||
|
||
// GetBlob implements BlobStore.
|
||
func (s *server) PutBlob(ctx context.Context, req *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error) {
|
||
if s.blob == nil {
|
||
return &resourcepb.PutBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "blob store not configured",
|
||
Code: http.StatusNotImplemented,
|
||
}}, nil
|
||
}
|
||
|
||
rsp, err := s.blob.PutResourceBlob(ctx, req)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) getPartialObject(ctx context.Context, key *resourcepb.ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *resourcepb.ErrorResult) {
|
||
if r := verifyRequestKey(key); r != nil {
|
||
return nil, r
|
||
}
|
||
|
||
rsp := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
|
||
Key: key,
|
||
ResourceVersion: rv,
|
||
})
|
||
if rsp.Error != nil {
|
||
return nil, rsp.Error
|
||
}
|
||
|
||
partial := &metav1.PartialObjectMetadata{}
|
||
err := json.Unmarshal(rsp.Value, partial)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
obj, err := utils.MetaAccessor(partial)
|
||
if err != nil {
|
||
return nil, AsErrorResult(err)
|
||
}
|
||
return obj, nil
|
||
}
|
||
|
||
// GetBlob implements BlobStore.
|
||
func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (*resourcepb.GetBlobResponse, error) {
|
||
if s.blob == nil {
|
||
return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "blob store not configured",
|
||
Code: http.StatusNotImplemented,
|
||
}}, nil
|
||
}
|
||
|
||
var info *utils.BlobInfo
|
||
if req.Uid == "" {
|
||
// The linked blob is stored in the resource metadata attributes
|
||
obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
|
||
if status != nil {
|
||
return &resourcepb.GetBlobResponse{Error: status}, nil
|
||
}
|
||
|
||
info = obj.GetBlob()
|
||
if info == nil || info.UID == "" {
|
||
return &resourcepb.GetBlobResponse{Error: &resourcepb.ErrorResult{
|
||
Message: "Resource does not have a linked blob",
|
||
Code: 404,
|
||
}}, nil
|
||
}
|
||
} else {
|
||
info = &utils.BlobInfo{UID: req.Uid}
|
||
}
|
||
|
||
rsp, err := s.blob.GetResourceBlob(ctx, req.Resource, info, req.MustProxyBytes)
|
||
if err != nil {
|
||
rsp.Error = AsErrorResult(err)
|
||
}
|
||
return rsp, nil
|
||
}
|
||
|
||
func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func()) error {
|
||
boff := backoff.New(ctx, backoff.Config{
|
||
MinBackoff: DefaultMinBackoff,
|
||
MaxBackoff: DefaultMaxBackoff,
|
||
MaxRetries: DefaultMaxRetries,
|
||
})
|
||
|
||
var (
|
||
wg sync.WaitGroup
|
||
err error
|
||
)
|
||
wg.Add(1)
|
||
wrapped := func() {
|
||
defer wg.Done()
|
||
runnable()
|
||
}
|
||
for boff.Ongoing() {
|
||
err = s.queue.Enqueue(ctx, tenantID, wrapped)
|
||
if err == nil {
|
||
break
|
||
}
|
||
s.log.Warn("failed to enqueue runnable, retrying",
|
||
"maxRetries", DefaultMaxRetries,
|
||
"tenantID", tenantID,
|
||
"error", err)
|
||
boff.Wait()
|
||
}
|
||
if err != nil {
|
||
s.log.Error("failed to enqueue runnable",
|
||
"maxRetries", DefaultMaxRetries,
|
||
"tenantID", tenantID,
|
||
"error", err)
|
||
return fmt.Errorf("failed to enqueue runnable for tenant %s: %w", tenantID, err)
|
||
}
|
||
wg.Wait()
|
||
return nil
|
||
}
|