Files
2025-06-27 09:29:49 +00:00

336 lines
8.4 KiB
Go

package resource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
const grpcMetaKeyCollection = "x-gf-batch-collection"
const grpcMetaKeyRebuildCollection = "x-gf-batch-rebuild-collection"
const grpcMetaKeySkipValidation = "x-gf-batch-skip-validation"
// Logged in trace.
var metadataKeys = []string{
grpcMetaKeyCollection,
grpcMetaKeyRebuildCollection,
grpcMetaKeySkipValidation,
}
func grpcMetaValueIsTrue(vals []string) bool {
return len(vals) == 1 && vals[0] == "true"
}
type BulkRequestIterator interface {
Next() bool
// The next event we should process
Request() *resourcepb.BulkRequest
// Rollback requested
RollbackRequested() bool
}
type BulkProcessingBackend interface {
ProcessBulk(ctx context.Context, setting BulkSettings, iter BulkRequestIterator) *resourcepb.BulkResponse
}
type BulkResourceWriter interface {
io.Closer
Write(ctx context.Context, key *resourcepb.ResourceKey, value []byte) error
// Called when finished writing
CloseWithResults() (*resourcepb.BulkResponse, error)
}
type BulkSettings struct {
// All requests will be within this namespace/group/resource
Collection []*resourcepb.ResourceKey
// The batch will include everything from the collection
// - all existing values will be removed/replaced if the batch completes successfully
RebuildCollection bool
// The byte[] payload and folder has already been validated - no need to decode and verify
SkipValidation bool
}
func (x *BulkSettings) ToMD() metadata.MD {
md := make(metadata.MD)
if len(x.Collection) > 0 {
for _, v := range x.Collection {
md[grpcMetaKeyCollection] = append(md[grpcMetaKeyCollection], SearchID(v))
}
}
if x.RebuildCollection {
md[grpcMetaKeyRebuildCollection] = []string{"true"}
}
if x.SkipValidation {
md[grpcMetaKeySkipValidation] = []string{"true"}
}
return md
}
func NewBulkSettings(md metadata.MD) (BulkSettings, error) {
settings := BulkSettings{}
for k, v := range md {
switch k {
case grpcMetaKeyCollection:
for _, c := range v {
key := &resourcepb.ResourceKey{}
err := ReadSearchID(key, c)
if err != nil {
return settings, fmt.Errorf("error reading collection metadata: %s / %w", c, err)
}
settings.Collection = append(settings.Collection, key)
}
case grpcMetaKeyRebuildCollection:
settings.RebuildCollection = grpcMetaValueIsTrue(v)
case grpcMetaKeySkipValidation:
settings.SkipValidation = grpcMetaValueIsTrue(v)
}
}
return settings, nil
}
// BulkWrite implements ResourceServer.
// All requests must be to the same NAMESPACE/GROUP/RESOURCE
func (s *server) BulkProcess(stream resourcepb.BulkStore_BulkProcessServer) error {
ctx := stream.Context()
ctx, span := s.tracer.Start(ctx, "resource.server.BulkProcess")
defer span.End()
sendAndClose := func(rsp *resourcepb.BulkResponse) error {
span.AddEvent("sendAndClose", trace.WithAttributes(attribute.String("msg", rsp.String())))
return stream.SendAndClose(rsp)
}
user, ok := authlib.AuthInfoFrom(ctx)
if !ok || user == nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
},
})
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "unable to read metadata gRPC request",
Code: http.StatusPreconditionFailed,
},
})
}
// Add relevant metadata into span.
for _, k := range metadataKeys {
meta := md.Get(k)
if len(meta) > 0 {
span.SetAttributes(attribute.StringSlice(k, meta))
}
}
runner := &batchRunner{
checker: make(map[string]authlib.ItemChecker), // Can create
stream: stream,
span: span,
}
settings, err := NewBulkSettings(md)
if err != nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "error reading settings",
Reason: err.Error(),
Code: http.StatusPreconditionFailed,
},
})
}
if len(settings.Collection) < 1 {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Missing target collection(s) in request header",
Code: http.StatusBadRequest,
},
})
}
if settings.RebuildCollection {
for _, k := range settings.Collection {
// Can we delete the whole collection
rsp, err := s.access.Check(ctx, user, authlib.CheckRequest{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Verb: utils.VerbDeleteCollection,
})
if err != nil || !rsp.Allowed {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: fmt.Sprintf("Requester must be able to: %s", utils.VerbDeleteCollection),
Code: http.StatusForbidden,
},
})
}
// This will be called for each request -- with the folder ID
runner.checker[NSGR(k)], err = s.access.Compile(ctx, user, authlib.ListRequest{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Verb: utils.VerbCreate,
})
if err != nil {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Unable to check `create` permission",
Code: http.StatusForbidden,
},
})
}
}
} else {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "Bulk currently only supports RebuildCollection",
Code: http.StatusBadRequest,
},
})
}
backend, ok := s.backend.(BulkProcessingBackend)
if !ok {
return sendAndClose(&resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Message: "The server backend does not support batch processing",
Code: http.StatusNotImplemented,
},
})
}
// BulkProcess requests
rsp := backend.ProcessBulk(ctx, settings, runner)
if rsp == nil {
rsp = &resourcepb.BulkResponse{
Error: &resourcepb.ErrorResult{
Code: http.StatusInternalServerError,
Message: "Nothing returned from process batch",
},
}
}
if runner.err != nil {
rsp.Error = AsErrorResult(runner.err)
}
if rsp.Error == nil && s.search != nil {
// Rebuild any changed indexes
for _, summary := range rsp.Summary {
_, _, err := s.search.build(ctx, NamespacedResource{
Namespace: summary.Namespace,
Group: summary.Group,
Resource: summary.Resource,
}, summary.Count, summary.ResourceVersion)
if err != nil {
s.log.Warn("error building search index after batch load", "err", err)
rsp.Error = &resourcepb.ErrorResult{
Code: http.StatusInternalServerError,
Message: "err building search index: " + summary.Resource,
Reason: err.Error(),
}
}
}
}
return sendAndClose(rsp)
}
var (
_ BulkRequestIterator = (*batchRunner)(nil)
)
type batchRunner struct {
stream resourcepb.BulkStore_BulkProcessServer
rollback bool
request *resourcepb.BulkRequest
err error
checker map[string]authlib.ItemChecker
span trace.Span
}
// Next implements BulkRequestIterator.
func (b *batchRunner) Next() bool {
if b.rollback {
return true
}
b.request, b.err = b.stream.Recv()
if errors.Is(b.err, io.EOF) {
b.err = nil
b.rollback = false
b.request = nil
return false
}
if b.err != nil {
b.rollback = true
b.span.AddEvent("next", trace.WithAttributes(attribute.String("error", b.err.Error())))
return true
}
if b.request != nil {
key := b.request.Key
k := NSGR(key)
checker, ok := b.checker[k]
if !ok {
b.err = fmt.Errorf("missing access control for: %s", k)
b.rollback = true
} else if !checker(key.Name, b.request.Folder) {
b.err = fmt.Errorf("not allowed to create resource")
b.rollback = true
}
// Mention resource in the span.
attrs := []attribute.KeyValue{
attribute.String("key", nsgrWithName(key)),
}
if b.err != nil {
attrs = append(attrs, attribute.String("error", b.err.Error()))
}
b.span.AddEvent("next", trace.WithAttributes(attrs...))
return true
}
return false
}
// Request implements BulkRequestIterator.
func (b *batchRunner) Request() *resourcepb.BulkRequest {
if b.rollback {
return nil
}
return b.request
}
// RollbackRequested implements BulkRequestIterator.
func (b *batchRunner) RollbackRequested() bool {
if b.rollback {
b.rollback = false // break iterator
return true
}
return false
}